diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 2e546f8..5e40106 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -16,6 +16,7 @@ #include #include +#include namespace wsrep { @@ -128,6 +129,16 @@ namespace wsrep int flags_; }; + static inline + std::ostream& operator<<(std::ostream& os, const wsrep::ws_meta& ws_meta) + { + os << "gtid: " << ws_meta.gtid() + << " server_id: " << ws_meta.server_id() + << " client_id: " << ws_meta.client_id() + << " trx_id: " << ws_meta.transaction_id() + << " flags: " << ws_meta.flags(); + return os; + } // Abstract interface for provider implementations class provider diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 039f0fa..a5dbadd 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -24,7 +24,8 @@ namespace wsrep }; streaming_context() - : fragments_() + : fragments_certified_() + , fragments_() , rollback_replicated_for_() , fragment_unit_() , fragment_size_() @@ -51,22 +52,33 @@ namespace wsrep fragment_size_ = 0; } - void certified(wsrep::seqno seqno, size_t bytes) + void certified(size_t bytes) { - fragments_.push_back(seqno); + ++fragments_certified_; bytes_certified_ += bytes; } - void applied(wsrep::seqno seqno) - { - fragments_.push_back(seqno); - } - size_t fragments_certified() const + { + return fragments_certified_; + } + + void stored(wsrep::seqno seqno) + { + fragments_.push_back(seqno); + } + + size_t fragments_stored() const { return fragments_.size(); } + void applied(wsrep::seqno seqno) + { + ++fragments_certified_; + fragments_.push_back(seqno); + } + size_t bytes_certified() const { return bytes_certified_; @@ -84,22 +96,36 @@ namespace wsrep wsrep::transaction_id::undefined()); } - size_t unit_counter() const { return unit_counter_; } + size_t unit_counter() const + { + return unit_counter_; + } + void increment_unit_counter(size_t inc) - { unit_counter_ += inc; } - void reset_unit_counter() { unit_counter_ = 0; } + { + unit_counter_ += inc; + } + + void reset_unit_counter() + { + unit_counter_ = 0; + } + const std::vector& fragments() const { return fragments_; } + void cleanup() { + fragments_certified_ = 0; fragments_.clear(); rollback_replicated_for_ = wsrep::transaction_id::undefined(); bytes_certified_ = 0; unit_counter_ = 0; } private: + size_t fragments_certified_; std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; enum fragment_unit fragment_unit_; diff --git a/src/client_state.cpp b/src/client_state.cpp index 52bb118..21987fe 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -236,7 +236,7 @@ int wsrep::client_state::enable_streaming( size_t fragment_size) { assert(mode_ == m_local); - if (transaction_.streaming_context().fragments_certified() && + if (transaction_.is_streaming() && transaction_.streaming_context().fragment_unit() != fragment_unit) { diff --git a/src/server_state.cpp b/src/server_state.cpp index 0068b51..63ded8d 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -120,13 +120,45 @@ static int apply_write_set(wsrep::server_state& server_state, const wsrep::const_buffer& data) { int ret(0); - if (wsrep::starts_transaction(ws_meta.flags()) && - wsrep::commits_transaction(ws_meta.flags()) && - wsrep::rolls_back_transaction(ws_meta.flags())) + // wsrep::log_info() << "apply_write_set: " << ws_meta; + if (wsrep::rolls_back_transaction(ws_meta.flags())) { - // Non streaming rollback (certification failed) - ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + if (wsrep::starts_transaction(ws_meta.flags())) + { + // No transaction existed before, log a dummy write set + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); + } + else + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); + } + else + { + // rollback_fragment() consumes sa + ret = rollback_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } } else if (wsrep::starts_transaction(ws_meta.flags()) && wsrep::commits_transaction(ws_meta.flags())) @@ -225,36 +257,6 @@ static int apply_write_set(wsrep::server_state& server_state, } } } - else if (wsrep::rolls_back_transaction(ws_meta.flags())) - { - wsrep::high_priority_service* sa( - server_state.find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id())); - if (sa == 0) - { - // It is possible that rapid group membership changes - // may cause streaming transaction be rolled back before - // commit fragment comes in. Although this is a valid - // situation, log a warning if a sac cannot be found as - // it may be an indication of a bug too. - wsrep::log_warning() - << "Could not find applier context for " - << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); - ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); - } - else - { - // Rollback fragment consumes sa - ret = rollback_fragment(server_state, - high_priority_service, - sa, - ws_handle, - ws_meta, - data); - } - } else { assert(0); diff --git a/src/transaction.cpp b/src/transaction.cpp index 83cfce4..9b1c262 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -434,6 +434,7 @@ int wsrep::transaction::before_commit() if (ret) { state(lock, s_must_abort); + state(lock, s_aborting); } break; default: @@ -459,8 +460,22 @@ int wsrep::transaction::ordered_commit() // aborted anymore // 3) The provider should always guarantee that the transactions which // have been ordered for commit can finish committing. - assert(ret == 0); - state(lock, s_ordered_commit); + // + // The exception here is a storage service transaction which is running + // in high priority mode. The fragment storage commit may get BF + // aborted in the provider after commit ordering has been + // established since the transaction is operating in streaming + // mode. + if (ret) + { + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state(lock, s_must_abort); + state(lock, s_aborting); + } + else + { + state(lock, s_ordered_commit); + } debug_log_state("ordered_commit_leave"); return ret; } @@ -1024,9 +1039,11 @@ int wsrep::transaction::certify_fragment( { case wsrep::provider::success: assert(sr_ws_meta.seqno().is_undefined() == false); - streaming_context_.certified(sr_ws_meta.seqno(), data.size()); + streaming_context_.certified(data.size()); if (storage_service.update_fragment_meta(sr_ws_meta)) { + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); ret = 1; break; } @@ -1034,9 +1051,16 @@ int wsrep::transaction::certify_fragment( { ret = 1; } + else + { + streaming_context_.stored(sr_ws_meta.seqno()); + } break; default: - storage_service.rollback(ws_handle_, sr_ws_meta); + // Storage service rollback must be done out of order, + // otherwise there may be a deadlock between BF aborter + // and the rollback process. + storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); ret = 1; break; } @@ -1044,8 +1068,13 @@ int wsrep::transaction::certify_fragment( // Note: This does not release the handle in the provider // since streaming is still on. However it is needed to // make provider internal state to transition for the - // next fragment. - provider().release(ws_handle_); + // next fragment. If any of the operations above failed, + // the handle needs to be left unreleased for the following + // rollback process. + if (ret == 0) + { + provider().release(ws_handle_); + } lock.lock(); if (ret) { @@ -1152,8 +1181,8 @@ int wsrep::transaction::certify_commit( break; case s_must_abort: // We got BF aborted after succesful certification - // and before acquiring client context lock. This means that - // the trasaction must be replayed. + // and before acquiring client state lock. The trasaction + // must be replayed. client_service_.will_replay(); state(lock, s_must_replay); break; @@ -1266,6 +1295,7 @@ void wsrep::transaction::streaming_rollback() debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); assert(streaming_context_.rolled_back() == false); + assert(is_streaming()); if (bf_aborted_in_total_order_) {