diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 5925fa2..faa1571 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -47,7 +47,8 @@ int db::high_priority_service::adopt_transaction(const wsrep::transaction&) int db::high_priority_service::apply_write_set( const wsrep::ws_meta&, - const wsrep::const_buffer&) + const wsrep::const_buffer&, + wsrep::mutable_buffer&) { client_.se_trx_.start(&client_); client_.se_trx_.apply(client_.client_state().transaction()); @@ -56,7 +57,8 @@ int db::high_priority_service::apply_write_set( int db::high_priority_service::apply_toi( const wsrep::ws_meta&, - const wsrep::const_buffer&) + const wsrep::const_buffer&, + wsrep::mutable_buffer&) { throw wsrep::not_implemented_error(); } @@ -84,6 +86,11 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, return ret; } +void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) +{ + client_.client_state_.adopt_apply_error(err); +} + void db::high_priority_service::after_apply() { client_.client_state_.after_applying(); @@ -91,17 +98,22 @@ void db::high_priority_service::after_apply() int db::high_priority_service::log_dummy_write_set( const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + wsrep::mutable_buffer& err) { int ret(client_.client_state_.start_transaction(ws_handle, ws_meta)); assert(ret == 0); - client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); - ret = client_.client_state_.before_commit(); - assert(ret == 0); - ret = client_.client_state_.ordered_commit(); - assert(ret == ret); - ret = client_.client_state_.after_commit(); - assert(ret == 0); + if (ws_meta.ordered()) + { + client_.client_state_.adopt_apply_error(err); + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); + ret = client_.client_state_.before_commit(); + assert(ret == 0); + ret = client_.client_state_.ordered_commit(); + assert(ret == 0); + ret = client_.client_state_.after_commit(); + assert(ret == 0); + } client_.client_state_.after_applying(); return ret; } diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 279ac69..8f16234 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -35,7 +35,8 @@ namespace db const wsrep::transaction& transaction() const override; int adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::ws_meta&, - const wsrep::const_buffer&) override; + const wsrep::const_buffer&, + wsrep::mutable_buffer&) override; int append_fragment_and_commit( const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&) @@ -45,14 +46,17 @@ namespace db { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; + int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&) override; + void adopt_apply_error(wsrep::mutable_buffer&) override; virtual void after_apply() override; void store_globals() override { } void reset_globals() override { } void switch_execution_context(wsrep::high_priority_service&) override { } int log_dummy_write_set(const wsrep::ws_handle&, - const wsrep::ws_meta&) override; + const wsrep::ws_meta&, + wsrep::mutable_buffer&) override; virtual bool is_replaying() const override; void debug_crash(const char*) override { } private: @@ -71,7 +75,7 @@ namespace db // After apply is empty for replayer to keep the transaction // context available for the client session after replaying // is over. - void after_apply() override { } + void after_apply() override {} bool is_replaying() const override { return true; } }; diff --git a/include/wsrep/buffer.hpp b/include/wsrep/buffer.hpp index e737b65..7eaea75 100644 --- a/include/wsrep/buffer.hpp +++ b/include/wsrep/buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -38,13 +38,22 @@ namespace wsrep , size_(size) { } + const_buffer(const const_buffer& b) + : ptr_(b.ptr()) + , size_(b.size()) + { } + const void* ptr() const { return ptr_; } - const void* data() const { return ptr_; } + const char* data() const { return static_cast(ptr_); } size_t size() const { return size_; } + const_buffer& operator=(const const_buffer& b) + { + ptr_ = b.ptr(); + size_ = b.size(); + return *this; + } private: - // const_buffer(const const_buffer&); - // const_buffer& operator=(const const_buffer&); const void* ptr_; size_t size_; }; @@ -57,16 +66,36 @@ namespace wsrep : buffer_() { } + void resize(size_t s) { buffer_.resize(s); } + + void clear() + { + // using swap to ensure deallocation + std::vector().swap(buffer_); + } + void push_back(const char* begin, const char* end) { buffer_.insert(buffer_.end(), begin, end); } - const char* data() const { return &buffer_[0]; } + + template void push_back(const C& c) + { + std::copy(c.begin(), c.end(), std::back_inserter(buffer_)); + } + size_t size() const { return buffer_.size(); } + char* data() { return &buffer_[0]; } + const char* data() const { return &buffer_[0]; } + + mutable_buffer& operator= (const mutable_buffer& other) + { + buffer_ = other.buffer_; + return *this; + } private: std::vector buffer_; }; - } #endif // WSREP_BUFFER_HPP diff --git a/include/wsrep/client_id.hpp b/include/wsrep/client_id.hpp index faa407e..2861d8e 100644 --- a/include/wsrep/client_id.hpp +++ b/include/wsrep/client_id.hpp @@ -41,6 +41,10 @@ namespace wsrep { return (id_ < other.id_); } + bool operator==(const client_id& other) const + { + return (id_ == other.id_); + } private: type id_; }; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index c18bdf5..9735048 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -90,6 +90,8 @@ namespace wsrep */ enum mode { + /** undefined mode */ + m_undefined, /** Locally operating client session. */ m_local, /** High priority mode */ @@ -247,6 +249,8 @@ namespace wsrep /** * Perform cleanup after applying a transaction. + * + * @param err Applying error (empty for no error) */ void after_applying() { @@ -508,6 +512,15 @@ namespace wsrep transaction_.adopt(transaction); } + /** + * Adopt (store) transaction applying error for further processing. + */ + void adopt_apply_error(wsrep::mutable_buffer& err) + { + assert(mode_ == m_high_priority); + transaction_.adopt_apply_error(err); + } + /** * Clone enough state from another transaction so that replaing will * be possible with a transaction contained in this client state. @@ -546,15 +559,15 @@ namespace wsrep * * @return Zero on success, non-zero otherwise. */ - int enter_toi(const wsrep::key_array& key_array, - const wsrep::const_buffer& buffer, - int flags); + int enter_toi_local(const wsrep::key_array& key_array, + const wsrep::const_buffer& buffer, + int flags); /** - * Enter applying total order critical section. + * Enter applier TOI mode * * @param ws_meta Write set meta data */ - int enter_toi(const wsrep::ws_meta& ws_meta); + void enter_toi_mode(const wsrep::ws_meta& ws_meta); /** * Return true if the client_state is under TOI operation. @@ -573,10 +586,20 @@ namespace wsrep { return toi_mode_; } + /** * Leave total order isolation critical section. + * (for local mode clients) + * + * @param err definition of the error that happened during the + * execution of TOI operation (empty for no error) */ - int leave_toi(); + int leave_toi_local(const wsrep::mutable_buffer& err); + + /** + * Leave applier TOI mode. + */ + void leave_toi_mode(); /** * Begin rolling schema upgrade operation. @@ -770,7 +793,7 @@ namespace wsrep , client_service_(client_service) , id_(id) , mode_(mode) - , toi_mode_() + , toi_mode_(m_undefined) , state_(s_none) , state_hist_() , transaction_(*this) @@ -804,6 +827,9 @@ namespace wsrep enum wsrep::provider::status status = wsrep::provider::success); + void enter_toi_common(); + void leave_toi_common(); + wsrep::thread::id owning_thread_id_; wsrep::thread::id current_thread_id_; bool has_rollbacker_; @@ -864,6 +890,7 @@ namespace wsrep { switch (mode) { + case wsrep::client_state::m_undefined: return "undefined"; case wsrep::client_state::m_local: return "local"; case wsrep::client_state::m_high_priority: return "high priority"; case wsrep::client_state::m_toi: return "toi"; diff --git a/include/wsrep/gtid.hpp b/include/wsrep/gtid.hpp index 8b5afb0..043dc48 100644 --- a/include/wsrep/gtid.hpp +++ b/include/wsrep/gtid.hpp @@ -48,6 +48,13 @@ namespace wsrep { return undefined_; } + bool operator==(const gtid& other) const + { + return ( + seqno_ == other.seqno_ && + id_ == other.id_ + ); + } private: static const wsrep::gtid undefined_; wsrep::id id_; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 3989656..f48f87f 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -41,7 +41,7 @@ namespace wsrep virtual ~high_priority_service() { } int apply(const ws_handle& ws_handle, const ws_meta& ws_meta, - const const_buffer& data) + const const_buffer& data) { return server_state_.on_apply(*this, ws_handle, ws_meta, data); } @@ -70,9 +70,13 @@ namespace wsrep * new transaction before applying a write set and must * either commit to make changes persistent or roll back. * + * @params ws_meta Write set meta data + * @params ws Write set buffer + * @params err Buffer to store error data */ - virtual int apply_write_set(const wsrep::ws_meta&, - const wsrep::const_buffer&) = 0; + virtual int apply_write_set(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& ws, + wsrep::mutable_buffer& err) = 0; /** * Append a fragment into fragment storage. This will be @@ -141,9 +145,14 @@ namespace wsrep * * TOI operation is a standalone operation and should not * be executed as a part of a transaction. + * + * @params ws_meta Write set meta data + * @params ws Write set buffer + * @params err Buffer to store error data */ - virtual int apply_toi(const wsrep::ws_meta&, - const wsrep::const_buffer&) = 0; + virtual int apply_toi(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& ws, + wsrep::mutable_buffer& err) = 0; /** * Actions to take after applying a write set was completed. @@ -175,11 +184,19 @@ namespace wsrep * * @params ws_handle Write set handle * @params ws_meta Write set meta data + * @params err Optional applying error data buffer, may be modified * * @return Zero in case of success, non-zero on failure */ virtual int log_dummy_write_set(const ws_handle& ws_handle, - const ws_meta& ws_meta) = 0; + const ws_meta& ws_meta, + wsrep::mutable_buffer& err) = 0; + + /** + * Adopt (store) apply error description for further reporting + * to provider, source buffer may be modified. + */ + virtual void adopt_apply_error(wsrep::mutable_buffer& err) = 0; virtual bool is_replaying() const = 0; @@ -189,6 +206,7 @@ namespace wsrep * Debug facility to crash the server at given point. */ virtual void debug_crash(const char* crash_point) = 0; + protected: wsrep::server_state& server_state_; bool must_exit_; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 7e6a166..8d57283 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -56,6 +56,14 @@ namespace wsrep wsrep::transaction_id transaction_id() const { return transaction_id_; } wsrep::client_id client_id() const { return client_id_; } + bool operator==(const stid& other) const + { + return ( + server_id_ == other.server_id_ && + transaction_id_ == other.transaction_id_ && + client_id_ == other.client_id_ + ); + } private: wsrep::id server_id_; wsrep::transaction_id transaction_id_; @@ -84,6 +92,13 @@ namespace wsrep void* opaque() const { return opaque_; } + bool operator==(const ws_handle& other) const + { + return ( + transaction_id_ == other.transaction_id_ && + opaque_ == other.opaque_ + ); + } private: wsrep::transaction_id transaction_id_; void* opaque_; @@ -134,9 +149,21 @@ namespace wsrep return stid_.transaction_id(); } + bool ordered() const { return !gtid_.is_undefined(); } + wsrep::seqno depends_on() const { return depends_on_; } int flags() const { return flags_; } + + bool operator==(const ws_meta& other) const + { + return ( + gtid_ == other.gtid_ && + stid_ == other.stid_ && + depends_on_ == other.depends_on_ && + flags_ == other.flags_ + ); + } private: wsrep::gtid gtid_; wsrep::stid stid_; @@ -300,7 +327,8 @@ namespace wsrep virtual enum status commit_order_enter(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; virtual int commit_order_leave(const wsrep::ws_handle&, - const wsrep::ws_meta&) = 0; + const wsrep::ws_meta&, + const wsrep::mutable_buffer& err) = 0; virtual int release(wsrep::ws_handle&) = 0; /** @@ -325,7 +353,8 @@ namespace wsrep /** * Leave total order isolation critical section */ - virtual enum status leave_toi(wsrep::client_id) = 0; + virtual enum status leave_toi(wsrep::client_id, + const wsrep::mutable_buffer& err) = 0; /** * Perform a causal read on cluster. diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index f282ead..6c78cd8 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -27,6 +27,7 @@ #include "streaming_context.hpp" #include "lock.hpp" #include "sr_key_set.hpp" +#include "buffer.hpp" #include #include @@ -198,6 +199,12 @@ namespace wsrep { return streaming_context_; } wsrep::streaming_context& streaming_context() { return streaming_context_; } + void adopt_apply_error(wsrep::mutable_buffer& buf) + { + apply_error_buf_ = std::move(buf); + } + const wsrep::mutable_buffer& apply_error() const + { return apply_error_buf_; } private: transaction(const transaction&); transaction operator=(const transaction&); @@ -214,6 +221,7 @@ namespace wsrep int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); int append_sr_keys_for_commit(); + int release_commit_order(wsrep::unique_lock&); void streaming_rollback(wsrep::unique_lock&); void clear_fragments(); void cleanup(); @@ -240,6 +248,7 @@ namespace wsrep size_t fragments_certified_for_statement_; wsrep::streaming_context streaming_context_; wsrep::sr_key_set sr_keys_; + wsrep::mutable_buffer apply_error_buf_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/client_state.cpp b/src/client_state.cpp index 540ebca..a5fa52f 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -299,10 +299,16 @@ void wsrep::client_state::disable_streaming() // TOI // ////////////////////////////////////////////////////////////////////////////// +void wsrep::client_state::enter_toi_common() +{ + wsrep::unique_lock lock(mutex_); + toi_mode_ = mode_; + mode(lock, m_toi); +} -int wsrep::client_state::enter_toi(const wsrep::key_array& keys, - const wsrep::const_buffer& buffer, - int flags) +int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + int flags) { assert(state_ == s_exec); assert(mode_ == m_local); @@ -311,9 +317,7 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys, { case wsrep::provider::success: { - wsrep::unique_lock lock(mutex_); - toi_mode_ = mode_; - mode(lock, m_toi); + enter_toi_common(); ret = 0; break; } @@ -326,43 +330,37 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys, return ret; } -int wsrep::client_state::enter_toi(const wsrep::ws_meta& ws_meta) +void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta) { - wsrep::unique_lock lock(mutex_); assert(mode_ == m_high_priority); - toi_mode_ = mode_; - mode(lock, m_toi); + enter_toi_common(); toi_meta_ = ws_meta; - return 0; } -int wsrep::client_state::leave_toi() +void wsrep::client_state::leave_toi_common() { - int ret(0); - if (toi_mode_ == m_local) - { - switch (provider().leave_toi(id_)) - { - case wsrep::provider::success: - break; - default: - assert(0); - override_error(wsrep::e_error_during_commit, - wsrep::provider::error_unknown); - ret = 1; - break; - } - } wsrep::unique_lock lock(mutex_); mode(lock, toi_mode_); - toi_mode_ = m_local; + toi_mode_ = m_undefined; if (toi_meta_.gtid().is_undefined() == false) { update_last_written_gtid(toi_meta_.gtid()); } toi_meta_ = wsrep::ws_meta(); +} - return ret; +int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err) +{ + assert(toi_mode_ == m_local); + leave_toi_common(); + + return (provider().leave_toi(id_, err) == provider::success ? 0 : 1); +} + +void wsrep::client_state::leave_toi_mode() +{ + assert(toi_mode_ == m_high_priority); + leave_toi_common(); } /////////////////////////////////////////////////////////////////////////////// @@ -449,7 +447,7 @@ void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) { assert(last_written_gtid_.is_undefined() || (last_written_gtid_.id() == gtid.id() && - last_written_gtid_.seqno() < gtid.seqno())); + !(last_written_gtid_.seqno() > gtid.seqno()))); last_written_gtid_ = gtid; } @@ -513,12 +511,14 @@ void wsrep::client_state::mode( enum mode mode) { assert(lock.owns_lock()); + static const char allowed[n_modes_][n_modes_] = - { /* l h t r */ - { 0, 1, 1, 1 }, /* local */ - { 1, 0, 1, 0 }, /* high prio */ - { 1, 1, 0, 0 }, /* toi */ - { 1, 0, 0, 0 } /* rsu */ + { /* u l h t r */ + { 0, 0, 0, 0, 0 }, /* undefined */ + { 0, 0, 1, 1, 1 }, /* local */ + { 0, 1, 0, 1, 0 }, /* high prio */ + { 0, 1, 1, 0, 0 }, /* toi */ + { 0, 1, 0, 0, 0 } /* rsu */ }; if (!allowed[mode_][mode]) { diff --git a/src/server_state.cpp b/src/server_state.cpp index b53c444..0b71349 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -60,25 +60,86 @@ static void log_adopt_error(const wsrep::transaction& transaction) << "which may need to be removed manually."; } -static int apply_fragment(wsrep::high_priority_service& high_priority_service, - wsrep::high_priority_service& streaming_applier, +// resolve which of the two errors return to caller +static inline int resolve_return_error(bool const vote, + int const vote_err, + int const apply_err) +{ + if (vote) return vote_err; + return vote_err != 0 ? vote_err : apply_err; +} + +static void +discard_streaming_applier(wsrep::server_state& server_state, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_meta& ws_meta) +{ + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); +} + +static int apply_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) { - int ret; + int ret(0); + int apply_err; + wsrep::mutable_buffer err; { wsrep::high_priority_switch sw(high_priority_service, - streaming_applier); + *streaming_applier); + apply_err = streaming_applier->apply_write_set(ws_meta, data, err); + if (!apply_err) + { + assert(err.size() == 0); + streaming_applier->after_apply(); + } + else + { + bool const remove_fragments(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); - ret = streaming_applier.apply_write_set(ws_meta, data); - streaming_applier.after_apply(); + if (remove_fragments) + { + ret = ret || streaming_applier->start_transaction(ws_handle, + ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err), 0); + ret = ret || streaming_applier->remove_fragments(ws_meta); + ret = ret || streaming_applier->commit(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + } + else + { + ret = streaming_applier->log_dummy_write_set(ws_handle, + ws_meta, err); + } + } } - high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); - ret = ret || high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data); - high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); - high_priority_service.after_apply(); + + if (!ret) + { + if (!apply_err) + { + high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); + ret = high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data); + high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + discard_streaming_applier(server_state, streaming_applier,ws_meta); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } + } + return ret; } @@ -90,13 +151,27 @@ static int commit_fragment(wsrep::server_state& server_state, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) { - int ret; - // Make high priority switch to go out of scope - // before the streaming applier is released. + int ret(0); { wsrep::high_priority_switch sw( high_priority_service, *streaming_applier); - ret = streaming_applier->apply_write_set(ws_meta, data); + wsrep::mutable_buffer err; + int const apply_err( + streaming_applier->apply_write_set(ws_meta, data, err)); + if (apply_err) + { + assert(streaming_applier->transaction( + ).streaming_context().fragments().size() > 0); + ret = streaming_applier->rollback(ws_handle, ws_meta); + ret = ret || (streaming_applier->after_apply(), 0); + ret = ret || streaming_applier->start_transaction( + ws_handle, ws_meta); + ret = ret || (streaming_applier->adopt_apply_error(err),0); + } + else + { + assert(err.size() == 0); + } streaming_applier->debug_crash( "crash_apply_cb_before_fragment_removal"); ret = ret || streaming_applier->remove_fragments(ws_meta); @@ -107,15 +182,15 @@ static int commit_fragment(wsrep::server_state& server_state, ret = ret || streaming_applier->commit(ws_handle, ws_meta); streaming_applier->debug_crash( "crash_commit_cb_last_fragment_commit_success"); - streaming_applier->after_apply(); + ret = ret || (streaming_applier->after_apply(), 0); + ret = resolve_return_error(err.size() > 0, ret, apply_err); } - if (ret == 0) + + if (!ret) { - server_state.stop_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()); - server_state.server_service().release_high_priority_service( - streaming_applier); + discard_streaming_applier(server_state, streaming_applier, ws_meta); } + return ret; } @@ -129,6 +204,7 @@ static int rollback_fragment(wsrep::server_state& server_state, // Adopts transaction state and starts a transaction for // high priority service int adopt_error; + bool remove_fragments(false); if ((adopt_error = high_priority_service.adopt_transaction( streaming_applier->transaction()))) { @@ -137,25 +213,41 @@ static int rollback_fragment(wsrep::server_state& server_state, // Even if the adopt above fails we roll back the streaming transaction. // Adopt failure will leave stale entries in streaming log which can // be removed manually. + wsrep::const_buffer no_error; { wsrep::high_priority_switch ws( high_priority_service, *streaming_applier); // Streaming applier rolls back out of order. Fragment // removal grabs commit order below. - streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta()); - streaming_applier->after_apply(); + remove_fragments = streaming_applier->transaction(). + streaming_context().fragments().size() > 0; + ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta()); + ret = ret || (streaming_applier->after_apply(), 0); } - server_state.stop_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()); - server_state.server_service().release_high_priority_service( - streaming_applier); - if (adopt_error == 0) + if (!ret) { - high_priority_service.remove_fragments(ws_meta); - high_priority_service.commit(ws_handle, ws_meta); + discard_streaming_applier(server_state, streaming_applier, ws_meta); + + if (adopt_error == 0) + { + if (remove_fragments) + { + ret = high_priority_service.remove_fragments(ws_meta); + ret = ret || high_priority_service.commit(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + if (ws_meta.ordered()) + { + wsrep::mutable_buffer no_error; + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, no_error); + } + } + } } - high_priority_service.after_apply(); return ret; } @@ -166,14 +258,14 @@ static int apply_write_set(wsrep::server_state& server_state, const wsrep::const_buffer& data) { int ret(0); - // wsrep::log_info() << "apply_write_set: " << ws_meta; if (wsrep::rolls_back_transaction(ws_meta.flags())) { + wsrep::mutable_buffer no_error; if (wsrep::starts_transaction(ws_meta.flags())) { // No transaction existed before, log a dummy write set ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + ws_handle, ws_meta, no_error); } else { @@ -196,7 +288,7 @@ static int apply_write_set(wsrep::server_state& server_state, << ws_meta.server_id() << ": " << ws_meta.transaction_id()); ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + ws_handle, ws_meta, no_error); } else { @@ -212,14 +304,27 @@ static int apply_write_set(wsrep::server_state& server_state, else if (wsrep::starts_transaction(ws_meta.flags()) && wsrep::commits_transaction(ws_meta.flags())) { - ret = high_priority_service.start_transaction(ws_handle, ws_meta) || - high_priority_service.apply_write_set(ws_meta, data) || - high_priority_service.commit(ws_handle, ws_meta); - if (ret) + ret = high_priority_service.start_transaction(ws_handle, ws_meta); + if (!ret) { - high_priority_service.rollback(ws_handle, ws_meta); + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_write_set( + ws_meta, data, err)); + if (!apply_err) + { + assert(err.size() == 0); + ret = high_priority_service.commit(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + } + else + { + ret = high_priority_service.rollback(ws_handle, ws_meta); + ret = ret || (high_priority_service.after_apply(), 0); + ret = ret || high_priority_service.log_dummy_write_set( + ws_handle, ws_meta, err); + ret = resolve_return_error(err.size() > 0, ret, apply_err); + } } - high_priority_service.after_apply(); } else if (wsrep::starts_transaction(ws_meta.flags())) { @@ -231,8 +336,9 @@ static int apply_write_set(wsrep::server_state& server_state, server_state.start_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id(), sa); sa->start_transaction(ws_handle, ws_meta); - ret = apply_fragment(high_priority_service, - *sa, + ret = apply_fragment(server_state, + high_priority_service, + sa, ws_handle, ws_meta, data); @@ -252,13 +358,15 @@ static int apply_write_set(wsrep::server_state& server_state, wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + ws_handle, ws_meta, no_error); } else { - ret = apply_fragment(high_priority_service, - *sa, + ret = apply_fragment(server_state, + high_priority_service, + sa, ws_handle, ws_meta, data); @@ -268,9 +376,10 @@ static int apply_write_set(wsrep::server_state& server_state, { if (high_priority_service.is_replaying()) { + wsrep::mutable_buffer unused; ret = high_priority_service.start_transaction( ws_handle, ws_meta) || - high_priority_service.apply_write_set(ws_meta, data) || + high_priority_service.apply_write_set(ws_meta, data, unused) || high_priority_service.commit(ws_handle, ws_meta); } else @@ -289,8 +398,9 @@ static int apply_write_set(wsrep::server_state& server_state, << "Could not find applier context for " << ws_meta.server_id() << ": " << ws_meta.transaction_id(); + wsrep::mutable_buffer no_error; ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + ws_handle, ws_meta, no_error); } else { @@ -327,14 +437,11 @@ static int apply_toi(wsrep::provider& provider, // // Regular TOI. // - // Note that we ignore error returned by apply_toi - // call here. This must be revised after the error - // voting is added. - // provider.commit_order_enter(ws_handle, ws_meta); - (void)high_priority_service.apply_toi(ws_meta, data); - provider.commit_order_leave(ws_handle, ws_meta); - return 0; + wsrep::mutable_buffer err; + int const apply_err(high_priority_service.apply_toi(ws_meta,data,err)); + int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err)); + return resolve_return_error(err.size() > 0, vote_err, apply_err); } else if (wsrep::starts_transaction(ws_meta.flags())) { @@ -987,9 +1094,9 @@ void wsrep::server_state::start_streaming_client( wsrep::client_state* client_state) { wsrep::unique_lock lock(mutex_); - WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), - wsrep::log::debug_level_server_state, - "Start streaming client: " << client_state->id()); + WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), + wsrep::log::debug_level_server_state, + "Start streaming client: " << client_state->id()); if (streaming_clients_.insert( std::make_pair(client_state->id(), client_state)).second == false) { diff --git a/src/transaction.cpp b/src/transaction.cpp index 1607ce6..b6048d7 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -107,6 +107,7 @@ wsrep::transaction::transaction( , fragments_certified_for_statement_() , streaming_context_() , sr_keys_() + , apply_error_buf_() { } @@ -469,7 +470,8 @@ int wsrep::transaction::ordered_commit() assert(state() == s_committing); assert(ordered()); client_service_.debug_sync("wsrep_before_commit_order_leave"); - int ret(provider().commit_order_leave(ws_handle_, ws_meta_)); + int ret(provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_)); client_service_.debug_sync("wsrep_after_commit_order_leave"); // Should always succeed: // 1) If before commit before succeeds, the transaction handle @@ -670,6 +672,16 @@ int wsrep::transaction::after_rollback() return 0; } +int wsrep::transaction::release_commit_order( + wsrep::unique_lock& lock) +{ + lock.unlock(); + int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); + lock.lock(); + return ret || provider().commit_order_leave(ws_handle_, ws_meta_, + apply_error_buf_); +} + int wsrep::transaction::after_statement() { int ret(0); @@ -767,13 +779,7 @@ int wsrep::transaction::after_statement() { if (ordered()) { - lock.unlock(); - ret = provider().commit_order_enter(ws_handle_, ws_meta_); - lock.lock(); - if (ret == 0) - { - provider().commit_order_leave(ws_handle_, ws_meta_); - } + ret = release_commit_order(lock); } provider().release(ws_handle_); } @@ -795,25 +801,6 @@ void wsrep::transaction::after_applying() assert(state_ == s_executing || state_ == s_committed || state_ == s_aborted); - - // We may enter here from either high priority applier or - // from fragment storage service. High priority applier - // should always have set up meta data for ordering, but - // fragment storage service operation may be rolled back - // before the fragment is ordered and certified. - // Therefore we need to check separately if the ordering has - // been done. - if (state_ == s_aborted && ordered()) - { - lock.unlock(); - int ret(provider().commit_order_enter(ws_handle_, ws_meta_)); - lock.lock(); - if (ret == 0) - { - provider().commit_order_leave(ws_handle_, ws_meta_); - } - } - if (state_ != s_executing) { cleanup(); @@ -1581,6 +1568,7 @@ void wsrep::transaction::cleanup() sr_keys_.clear(); streaming_context_.cleanup(); client_service_.cleanup_transaction(); + apply_error_buf_.clear(); debug_log_state("cleanup_leave"); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 3fee80e..92640e3 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -788,13 +788,18 @@ wsrep::wsrep_provider_v26::commit_order_enter( wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native())); } -int wsrep::wsrep_provider_v26::commit_order_leave( +int +wsrep::wsrep_provider_v26::commit_order_leave( const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + const wsrep::mutable_buffer& err) { const_ws_handle cwsh(ws_handle); const_ws_meta cwsm(ws_meta); - return (wsrep_->commit_order_leave(wsrep_, cwsh.native(), cwsm.native(), 0) != WSREP_OK); + wsrep_buf_t const err_buf = { err.data(), err.size() }; + int ret(wsrep_->commit_order_leave( + wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK); + return ret; } int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle) @@ -851,10 +856,12 @@ wsrep::wsrep_provider_v26::enter_toi( } enum wsrep::provider::status -wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id) +wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id, + const wsrep::mutable_buffer& err) { + const wsrep_buf_t err_buf = { err.data(), err.size() }; return map_return_value(wsrep_->to_execute_end( - wsrep_, client_id.get(), 0)); + wsrep_, client_id.get(), &err_buf)); } std::pair diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 12d6265..b3376e2 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -64,7 +64,8 @@ namespace wsrep commit_order_enter(const wsrep::ws_handle&, const wsrep::ws_meta&); int commit_order_leave(const wsrep::ws_handle&, - const wsrep::ws_meta&); + const wsrep::ws_meta&, + const wsrep::mutable_buffer&); int release(wsrep::ws_handle&); enum wsrep::provider::status replay(const wsrep::ws_handle&, wsrep::high_priority_service*); @@ -73,7 +74,8 @@ namespace wsrep const wsrep::const_buffer&, wsrep::ws_meta&, int); - enum wsrep::provider::status leave_toi(wsrep::client_id); + enum wsrep::provider::status leave_toi(wsrep::client_id, + const wsrep::mutable_buffer&); std::pair causal_read(int) const; enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, int) const; diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 54cb20a..ee6f82c 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -19,6 +19,7 @@ #include "mock_high_priority_service.hpp" #include "mock_server_state.hpp" +#include int wsrep::mock_high_priority_service::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) @@ -34,13 +35,25 @@ int wsrep::mock_high_priority_service::adopt_transaction( } int wsrep::mock_high_priority_service::apply_write_set( - const wsrep::ws_meta&, - const wsrep::const_buffer&) + const wsrep::ws_meta& meta, + const wsrep::const_buffer&, + wsrep::mutable_buffer& err) { assert(client_state_->toi_meta().seqno().is_undefined()); assert(client_state_->transaction().state() == wsrep::transaction::s_executing || client_state_->transaction().state() == wsrep::transaction::s_replaying); - return (fail_next_applying_ ? 1 : 0); + if (fail_next_applying_) + { + std::ostringstream os; + os << "failed " << meta; + err.push_back(os.str()); + assert(err.size() > 0); + return 1; + } + else + { + return 0; + }; } int wsrep::mock_high_priority_service::commit( @@ -69,14 +82,29 @@ int wsrep::mock_high_priority_service::rollback( } int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, - const wsrep::const_buffer&) + const wsrep::const_buffer&, + wsrep::mutable_buffer&) { assert(client_state_->transaction().active() == false); assert(client_state_->toi_meta().seqno().is_undefined() == false); return (fail_next_toi_ ? 1 : 0); } +void wsrep::mock_high_priority_service::adopt_apply_error( + wsrep::mutable_buffer& err) +{ + client_state_->adopt_apply_error(err); +} + void wsrep::mock_high_priority_service::after_apply() { client_state_->after_applying(); } + +int wsrep::mock_high_priority_service::log_dummy_write_set( + const wsrep::ws_handle&, + const wsrep::ws_meta&, + wsrep::mutable_buffer& err) +{ + return err.size() > 0; +} diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 5e204cd..c93a6a6 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -47,7 +47,8 @@ namespace wsrep { return client_state_->transaction(); } int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; int apply_write_set(const wsrep::ws_meta&, - const wsrep::const_buffer&) WSREP_OVERRIDE; + const wsrep::const_buffer&, + wsrep::mutable_buffer&) WSREP_OVERRIDE; int append_fragment_and_commit( const wsrep::ws_handle&, const wsrep::ws_meta&, @@ -59,21 +60,23 @@ namespace wsrep WSREP_OVERRIDE; int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, - const wsrep::const_buffer&) WSREP_OVERRIDE; + const wsrep::const_buffer&, + wsrep::mutable_buffer&) WSREP_OVERRIDE; + void adopt_apply_error(wsrep::mutable_buffer& err) WSREP_OVERRIDE; void after_apply() WSREP_OVERRIDE; void store_globals() WSREP_OVERRIDE { } void reset_globals() WSREP_OVERRIDE { } void switch_execution_context(wsrep::high_priority_service&) WSREP_OVERRIDE { } int log_dummy_write_set(const wsrep::ws_handle&, - const wsrep::ws_meta&) - WSREP_OVERRIDE { return 0; } + const wsrep::ws_meta&, + wsrep::mutable_buffer&) WSREP_OVERRIDE; bool is_replaying() const WSREP_OVERRIDE { return replaying_; } void debug_crash(const char*) WSREP_OVERRIDE { /* Not in unit tests*/} - wsrep::mock_client_state* client_state() + wsrep::client_state& client_state() { - return client_state_; + return *client_state_; } bool do_2pc_; bool fail_next_applying_; diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index e1d9489..2146426 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -172,12 +172,15 @@ namespace wsrep } int commit_order_leave(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + const wsrep::mutable_buffer& err) WSREP_OVERRIDE { BOOST_REQUIRE(ws_handle.opaque()); BOOST_REQUIRE(ws_meta.seqno().is_undefined() == false); - return commit_order_leave_result_; + return err.size() > 0 ? + wsrep::provider::error_fatal : + commit_order_leave_result_; } int release(wsrep::ws_handle& ) @@ -195,7 +198,8 @@ namespace wsrep wsrep::mock_high_priority_service& high_priority_service( *static_cast(hps)); wsrep::mock_client_state& cc( - *high_priority_service.client_state()); + static_cast( + high_priority_service.client_state())); wsrep::high_priority_context high_priority_context(cc); const wsrep::transaction& tc(cc.transaction()); wsrep::ws_meta ws_meta; @@ -238,7 +242,8 @@ namespace wsrep int) WSREP_OVERRIDE { return wsrep::provider::success; } - enum wsrep::provider::status leave_toi(wsrep::client_id) + enum wsrep::provider::status leave_toi(wsrep::client_id, + const wsrep::mutable_buffer&) WSREP_OVERRIDE { return wsrep::provider::success; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index fab755d..23ee128 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -102,7 +102,7 @@ namespace wsrep { mock_high_priority_service* mhps( static_cast(high_priority_service)); - wsrep::mock_client* cs(static_cast( + wsrep::mock_client* cs(&static_cast( mhps->client_state())); cs->after_command_before_result(); cs->after_command_after_result(); diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp index a79ced7..ca65794 100644 --- a/test/mock_storage_service.cpp +++ b/test/mock_storage_service.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index aec7b6b..47dea46 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * @@ -239,6 +239,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc, BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, applying_server_fixture) { + /* make sure default success result is flipped to error_fatal */ + ss.provider().commit_order_leave_result_ = wsrep::provider::success; hps.fail_next_applying_ = true; char buf[1] = { 1 }; BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, @@ -252,6 +254,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback, applying_server_fixture) { + /* make sure default success result is flipped to error_fatal */ + ss.provider().commit_order_leave_result_ = wsrep::provider::success; hps.do_2pc_ = true; hps.fail_next_applying_ = true; char buf[1] = { 1 }; diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index c62ef4b..73f3b6b 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. * diff --git a/test/transaction_test_2pc.cpp b/test/transaction_test_2pc.cpp index 63645d3..9e8ee5e 100644 --- a/test/transaction_test_2pc.cpp +++ b/test/transaction_test_2pc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Codership Oy + * Copyright (C) 2018-2019 Codership Oy * * This file is part of wsrep-lib. *