diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index f260f52..e3788ab 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -20,6 +20,11 @@ int db::high_priority_service::start_transaction( return client_.client_state().start_transaction(ws_handle, ws_meta); } +const wsrep::transaction& db::high_priority_service::transaction() const +{ + return client_.client_state().transaction(); +} + void db::high_priority_service::adopt_transaction(const wsrep::transaction&) { throw wsrep::not_implemented_error(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index f0479ad..aeaca90 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -17,6 +17,7 @@ namespace db high_priority_service(db::server& server, db::client& client); int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) override; + const wsrep::transaction& transaction() const override; void adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) override; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index dcac518..fccb5ea 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -434,11 +434,17 @@ namespace wsrep return transaction_.start_replaying(ws_meta); } + /** + * Adopt a streaming transaction state. This is must be + * called from high_priority_service::adopt_transaction() + * during streaming transaction rollback. The call will + * set up enough context for handling the rollback + * fragment. + */ void adopt_transaction(const wsrep::transaction& transaction) { assert(mode_ == m_high_priority); - transaction_.start_transaction(transaction.id()); - transaction_.streaming_context() = transaction.streaming_context(); + transaction_.adopt(transaction); } /** @name Non-transactional operations */ diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index e8d04c8..12fc49c 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -36,6 +36,12 @@ namespace wsrep virtual int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; + /** + * Return transaction object associated to high priority + * service state. + */ + virtual const wsrep::transaction& transaction() const = 0; + /** * Adopt a transaction. */ @@ -83,8 +89,18 @@ namespace wsrep /** * Commit a transaction. + * An implementation must call + * wsrep::client_state::prepare_for_ordering() to set + * the ws_handle and ws_meta before the commit if the + * commit process will go through client state commit + * processing. Otherwise the implementation must release + * commit order explicitly via provider. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta */ - virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; + virtual int commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) = 0; /** * Roll back a transaction */ diff --git a/include/wsrep/key.hpp b/include/wsrep/key.hpp index 152732b..a367db2 100644 --- a/include/wsrep/key.hpp +++ b/include/wsrep/key.hpp @@ -8,6 +8,8 @@ #include "exception.hpp" #include "buffer.hpp" +#include + namespace wsrep { class key @@ -60,6 +62,7 @@ namespace wsrep typedef std::vector key_array; + std::ostream& operator<<(std::ostream&, const wsrep::key&); } #endif // WSREP_KEY_HPP diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 68392c6..2e546f8 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -262,7 +262,7 @@ namespace wsrep virtual enum status bf_abort(wsrep::seqno bf_seqno, wsrep::transaction_id victim_trx, wsrep::seqno& victim_seqno) = 0; - virtual int rollback(wsrep::transaction_id) = 0; + virtual enum status rollback(wsrep::transaction_id) = 0; virtual enum status commit_order_enter(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; virtual int commit_order_leave(const wsrep::ws_handle&, diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3a36e16..1732ed2 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -88,6 +88,7 @@ namespace wsrep int start_transaction(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta); + void adopt(const transaction& transaction); void fragment_applied(wsrep::seqno seqno); int prepare_for_ordering(const wsrep::ws_handle& ws_handle, @@ -155,6 +156,7 @@ namespace wsrep void clear_fragments(); void cleanup(); void debug_log_state(const char*) const; + void debug_log_key_append(const wsrep::key& key); wsrep::server_service& server_service_; wsrep::client_service& client_service_; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5357a25..ad1cba1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(wsrep-lib exception.cpp gtid.cpp id.cpp + key.cpp logger.cpp provider.cpp seqno.cpp diff --git a/src/client_state.cpp b/src/client_state.cpp index e051a6a..1f32fac 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -36,6 +36,7 @@ void wsrep::client_state::close() if (transaction_.active()) { client_service_.bf_rollback(); + transaction_.after_statement(); } debug_log_state("close: leave"); } @@ -421,7 +422,19 @@ void wsrep::client_state::state( wsrep::unique_lock& lock WSREP_UNUSED, enum wsrep::client_state::state state) { - assert(wsrep::this_thread::get_id() == owning_thread_id_); + // For locally processing client states (local, toi, rsu) + // changing the state is allowed only from the owning thread. + // In high priority mode the processing thread however may + // change and we check only that store_globals() has been + // called by the current thread to gain ownership. + // + // Note that this check assumes that there is always a single + // thread per local client connection. This may not always + // hold and the sanity check mechanism may need to be revised. + assert((mode_ != m_high_priority && + wsrep::this_thread::get_id() == owning_thread_id_) || + (mode_ == m_high_priority && + wsrep::this_thread::get_id() == current_thread_id_)); assert(lock.owns_lock()); static const char allowed[state_max_][state_max_] = { diff --git a/src/key.cpp b/src/key.cpp new file mode 100644 index 0000000..281805c --- /dev/null +++ b/src/key.cpp @@ -0,0 +1,36 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "wsrep/key.hpp" +#include +#include + +namespace +{ + void print_key_part(std::ostream& os, const void* ptr, size_t len) + { + std::ios::fmtflags flags_save(os.flags()); + os << len << ": "; + for (size_t i(0); i < len; ++i) + { + os << std::hex + << std::setfill('0') + << std::setw(2) + << static_cast( + *(reinterpret_cast(ptr) + i)) << " "; + } + os.flags(flags_save); + } +} + +std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::key& key) +{ + os << "type: " << key.type(); + for (size_t i(0); i < key.size(); ++i) + { + os << "\n "; + print_key_part(os, key.key_parts()[i].data(), key.key_parts()[i].size()); + } + return os; +} diff --git a/src/server_state.cpp b/src/server_state.cpp index fd01627..87792cc 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -28,6 +28,79 @@ namespace return (bootstrap || cluster_address == "gcomm://"); } + int apply_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service& streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) + { + int ret; + { + wsrep::high_priority_switch sw(high_priority_service, + streaming_applier); + ret = streaming_applier.apply_write_set(ws_meta, data); + streaming_applier.after_apply(); + } + ret = ret || high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data); + high_priority_service.after_apply(); + return ret; + } + + + int commit_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) + { + int ret; + // Make high priority switch to go out of scope + // before the streaming applier is released. + { + wsrep::high_priority_switch sw( + high_priority_service, *streaming_applier); + streaming_applier->remove_fragments(ws_meta); + ret = streaming_applier->commit(ws_handle, ws_meta); + streaming_applier->after_apply(); + } + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); + return ret; + } + + int rollback_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) + { + int ret= 0; + // Adopts transaction state and starts a transaction for + // high priority service + high_priority_service.adopt_transaction( + streaming_applier->transaction()); + { + wsrep::high_priority_switch ws( + high_priority_service, *streaming_applier); + streaming_applier->rollback(); + streaming_applier->after_apply(); + } + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); + + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(ws_handle, ws_meta); + return ret; + } + int apply_write_set(wsrep::server_state& server_state, wsrep::high_priority_service& high_priority_service, const wsrep::ws_handle& ws_handle, @@ -36,6 +109,14 @@ namespace { int ret(0); if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags()) && + wsrep::rolls_back_transaction(ws_meta.flags())) + { + // Non streaming rollback (certification failed) + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); + } + else if (wsrep::starts_transaction(ws_meta.flags()) && wsrep::commits_transaction(ws_meta.flags())) { if (high_priority_service.start_transaction(ws_handle, ws_meta)) @@ -66,13 +147,12 @@ namespace server_state.start_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id(), sa); sa->start_transaction(ws_handle, ws_meta); - { - wsrep::high_priority_switch sw(high_priority_service, *sa); - sa->apply_write_set(ws_meta, data); - sa->after_apply(); - } - high_priority_service.append_fragment_and_commit(ws_handle, ws_meta, data); - high_priority_service.after_apply(); + ret = apply_fragment(server_state, + high_priority_service, + *sa, + ws_handle, + ws_meta, + data); } else if (ws_meta.flags() == 0) { @@ -92,17 +172,13 @@ namespace } else { - wsrep::high_priority_switch sw(high_priority_service, *sa); - ret = sa->apply_write_set(ws_meta, data); - sa->after_apply(); + ret = apply_fragment(server_state, + high_priority_service, + *sa, + ws_handle, + ws_meta, + data); } - ret = ret || high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data); - if (ret) - { - high_priority_service.rollback(); - } - high_priority_service.after_apply(); } else if (wsrep::commits_transaction(ws_meta.flags())) { @@ -130,24 +206,48 @@ namespace } else { - // Make high priority switch to go out of scope - // before the streaming applier is released. - { - wsrep::high_priority_switch sw( - high_priority_service, *sa); - sa->remove_fragments(ws_meta); - ret = sa->commit(ws_handle, ws_meta); - sa->after_apply(); - } - server_state.stop_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()); - server_state.server_service().release_high_priority_service(sa); + // Commit fragment consumes sa + ret = commit_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); } } } + else if (wsrep::rolls_back_transaction(ws_meta.flags())) + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); + } + else + { + // Rollback fragment consumes sa + ret = rollback_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); + } + } else { - // SR fragment applying not implemented yet assert(0); } return ret; @@ -633,13 +733,6 @@ int wsrep::server_state::on_apply( const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) { - if (rolls_back_transaction(ws_meta.flags())) - { - provider().commit_order_enter(ws_handle, ws_meta); - // todo: server_service_.log_dummy_write_set(); - provider().commit_order_leave(ws_handle, ws_meta); - return 0; - } if (is_toi(ws_meta.flags())) { return apply_toi(provider(), high_priority_service, diff --git a/src/transaction.cpp b/src/transaction.cpp index b801e18..073a1e7 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -111,11 +111,12 @@ int wsrep::transaction::start_transaction( { debug_log_state("start_transaction enter"); assert(active() == false); + assert(flags() == 0); id_ = id; state_ = s_executing; state_hist_.clear(); ws_handle_ = wsrep::ws_handle(id); - flags_ |= wsrep::provider::flag::start_transaction; + flags(wsrep::provider::flag::start_transaction); switch (client_state_.mode()) { case wsrep::client_state::m_high_priority: @@ -140,12 +141,14 @@ int wsrep::transaction::start_transaction( { // assert(ws_meta.flags()); assert(active() == false); + assert(flags() == 0); id_ = ws_meta.transaction_id(); assert(client_state_.mode() == wsrep::client_state::m_high_priority); state_ = s_executing; state_hist_.clear(); ws_handle_ = ws_handle; ws_meta_ = ws_meta; + flags(wsrep::provider::flag::start_transaction); certified_ = true; } else @@ -156,6 +159,16 @@ int wsrep::transaction::start_transaction( return 0; } +void wsrep::transaction::adopt(const wsrep::transaction& transaction) +{ + debug_log_state("adopt enter"); + assert(transaction.is_streaming()); + start_transaction(transaction.id()); + flags_ = transaction.flags(); + streaming_context_ = transaction.streaming_context(); + debug_log_state("adopt leave"); +} + void wsrep::transaction::fragment_applied(wsrep::seqno seqno) { assert(active()); @@ -195,6 +208,7 @@ int wsrep::transaction::append_key(const wsrep::key& key) /** @todo Collect table level keys for SR commit */ try { + debug_log_key_append(key); sr_keys_.insert(key); return provider().append_key(ws_handle_, key); } @@ -493,49 +507,62 @@ int wsrep::transaction::before_rollback() debug_log_state("before_rollback_enter"); assert(state() == s_executing || state() == s_must_abort || - state() == s_aborting || // Background rollbacker + // Background rollbacker or rollback initiated from SE + state() == s_aborting || state() == s_cert_failed || state() == s_must_replay); - switch (state()) + switch (client_state_.mode()) { - case s_executing: - // Voluntary rollback - if (is_streaming()) - { - streaming_rollback(); - } - state(lock, s_aborting); - break; - case s_must_abort: - if (certified()) - { - state(lock, s_must_replay); - } - else + case wsrep::client_state::m_local: + switch (state()) { + case s_executing: + // Voluntary rollback if (is_streaming()) { streaming_rollback(); } state(lock, s_aborting); + break; + case s_must_abort: + if (certified()) + { + state(lock, s_must_replay); + } + else + { + if (is_streaming()) + { + streaming_rollback(); + } + state(lock, s_aborting); + } + break; + case s_cert_failed: + if (is_streaming()) + { + streaming_rollback(); + } + state(lock, s_aborting); + break; + case s_aborting: + if (is_streaming()) + { + provider().rollback(id_.get()); + } + break; + case s_must_replay: + break; + default: + assert(0); + break; } break; - case s_cert_failed: - if (is_streaming()) - { - streaming_rollback(); - } + case wsrep::client_state::m_high_priority: + assert(state_ == s_executing); state(lock, s_aborting); break; - case s_aborting: - if (is_streaming()) - { - provider().rollback(id_.get()); - } - break; - case s_must_replay: - break; default: assert(0); break; @@ -551,6 +578,11 @@ int wsrep::transaction::after_rollback() assert(state() == s_aborting || state() == s_must_replay); + if (is_streaming()) + { + clear_fragments(); + } + if (state() == s_aborting) { state(lock, s_aborted); @@ -709,6 +741,16 @@ void wsrep::transaction::after_applying() { cleanup(); } + else + { + // State remains executing, so this is a streaming applier. + // Reset the meta data to avoid releasing commit order + // critical section above if the next fragment is rollback + // fragment. Rollback fragment ordering will be handled by + // another instance while removing the fragments from + // storage. + ws_meta_ = wsrep::ws_meta(); + } debug_log_state("after_applying leave"); } @@ -804,6 +846,13 @@ void wsrep::transaction::state( << " -> " << to_string(next_state); } assert(lock.owns_lock()); + // BF aborter is allowed to change the state to must abort and + // further to aborting and aborted if the background rollbacker + // is launched. + assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || + next_state == s_must_abort || + next_state == s_aborting || + next_state == s_aborted); static const char allowed[n_states][n_states] = { /* ex pr ce co oc ct cf ma ab ad mr re */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ @@ -898,7 +947,7 @@ int wsrep::transaction::certify_fragment( if (storage_service.append_fragment( server_id, id(), - flags_, + flags(), wsrep::const_buffer(data.data(), data.size()))) { lock.lock(); @@ -911,7 +960,7 @@ int wsrep::transaction::certify_fragment( enum wsrep::provider::status cert_ret(provider().certify(client_state_.id().get(), ws_handle_, - flags_, + flags(), sr_ws_meta)); switch (cert_ret) { @@ -933,8 +982,12 @@ int wsrep::transaction::certify_fragment( ret = 1; break; } - provider().release(ws_handle_); } + // Note: This does not release the handle in the provider + // since streaming is still on. However it is needed to + // make provider internal state to transition for the + // next fragment. + provider().release(ws_handle_); lock.lock(); if (ret) { @@ -943,7 +996,7 @@ int wsrep::transaction::certify_fragment( else { state(lock, s_executing); - flags_ &= ~wsrep::provider::flag::start_transaction; + flags(flags() & ~wsrep::provider::flag::start_transaction); } return ret; } @@ -1139,7 +1192,13 @@ int wsrep::transaction::append_sr_keys_for_commit() void wsrep::transaction::streaming_rollback() { + debug_log_state("streaming_rollback enter"); assert(streaming_context_.rolled_back() == false); + // Create a high priority applier which will handle the + // rollback fragment or clean up on configuration change. + // Adopt transaction will copy fragment set and appropriate + // meta data. Mark current transaction streaming context + // rolled back. wsrep::high_priority_service* sa( server_service_.streaming_applier_service( client_state_.client_service())); @@ -1147,8 +1206,14 @@ void wsrep::transaction::streaming_rollback() client_state_.server_state().id(), id(), sa); sa->adopt_transaction(*this); streaming_context_.cleanup(); - // Replicate rollback fragment - provider().rollback(id_.get()); + streaming_context_.rolled_back(id_); + enum wsrep::provider::status ret; + if ((ret = provider().rollback(id_.get()))) + { + wsrep::log_warning() << "Failed to replicate rollback fragment for " + << id_.get() << ": " << ret; + } + debug_log_state("streaming_rollback leave"); } void wsrep::transaction::clear_fragments() @@ -1174,8 +1239,10 @@ void wsrep::transaction::cleanup() bf_abort_provider_status_ = wsrep::provider::success; bf_abort_client_state_ = 0; ws_meta_ = wsrep::ws_meta(); + flags_ = 0; certified_ = false; pa_unsafe_ = false; + streaming_context_.cleanup(); client_service_.cleanup_transaction(); debug_log_state("cleanup_leave"); } @@ -1185,11 +1252,30 @@ void wsrep::transaction::debug_log_state( { WSREP_TC_LOG_DEBUG( 1, context - << "(" << client_state_.id().get() - << "," << int64_t(id_.get()) - << "," << ws_meta_.seqno().get() - << "," << wsrep::to_string(state_) - << "," << wsrep::to_string(bf_abort_state_) - << "," << wsrep::to_string(client_state_.current_error()) - << ")"); + << "\n server: " << client_state_.server_state().id() + << ", client: " << client_state_.id().get() + << ", state: " << wsrep::to_c_string(client_state_.state()) + << ", mode: " << wsrep::to_c_string(client_state_.mode()) + << "\n trx_id: " << int64_t(id_.get()) + << ", seqno: " << ws_meta_.seqno().get() + << ", flags: " << flags() + << "\n" + << " state: " << wsrep::to_c_string(state_) + << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_) + << ", error: " << wsrep::to_c_string(client_state_.current_error()) + << "\n" + << " is_sr: " << is_streaming() + << ", frags: " << streaming_context_.fragments_certified() + << ", bytes: " << streaming_context_.bytes_certified() + << ", sr_rb: " << streaming_context_.rolled_back() + << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) + << ""); +} + +void wsrep::transaction::debug_log_key_append(const wsrep::key& key) +{ + WSREP_TC_LOG_DEBUG(2, "key_append" + << "trx_id: " + << int64_t(id().get()) + << " append key: " << key); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index f592f17..6963606 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -631,6 +631,12 @@ wsrep::wsrep_provider_v26::bf_abort( return map_return_value(ret); } +enum wsrep::provider::status +wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id) +{ + return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0)); +} + enum wsrep::provider::status wsrep::wsrep_provider_v26::commit_order_enter( const wsrep::ws_handle& ws_handle, diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 6794c55..903b751 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -42,7 +42,7 @@ namespace wsrep bf_abort(wsrep::seqno, wsrep::transaction_id, wsrep::seqno&); - int rollback(const wsrep::transaction_id) { ::abort(); return 0; } + enum wsrep::provider::status rollback(const wsrep::transaction_id); enum wsrep::provider::status commit_order_enter(const wsrep::ws_handle&, const wsrep::ws_meta&); diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index ffcce0d..483e037 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -35,11 +35,12 @@ int wsrep::mock_high_priority_service::apply_write_set( return (fail_next_applying_ ? 1 : 0); } -int wsrep::mock_high_priority_service::commit(const wsrep::ws_handle&, - const wsrep::ws_meta&) +int wsrep::mock_high_priority_service::commit( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) { - int ret(0); + client_state_->prepare_for_ordering(ws_handle, ws_meta, true); if (client_state_->client_service().do_2pc()) { ret = client_state_->before_prepare() || diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index e0d2887..fc7ad15 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -31,6 +31,8 @@ namespace wsrep int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; + const wsrep::transaction& transaction() const + { return client_state_->transaction(); } void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE; diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 576d0c2..18ea0ca 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -131,11 +131,11 @@ namespace wsrep enum wsrep::provider::status append_data(wsrep::ws_handle&, const wsrep::const_buffer&) { return wsrep::provider::success; } - int rollback(const wsrep::transaction_id) + enum wsrep::provider::status rollback(const wsrep::transaction_id) { ++fragments_; ++rollback_fragments_; - return 0; + return wsrep::provider::success; } enum wsrep::provider::status commit_order_enter(const wsrep::ws_handle& ws_handle,