From e18c9d597fb6e2e1518bfff46cacfa583bc8a44a Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sun, 10 Jun 2018 19:27:09 +0300 Subject: [PATCH] * Unit test for idle client BF abort. * Fixes to seqno conversion between provider and provider library. * Server context applying side fixes. --- include/wsrep/client_context.hpp | 23 +++++++++++--------- include/wsrep/mutex.hpp | 1 + include/wsrep/server_context.hpp | 4 +++- include/wsrep/transaction_context.hpp | 3 ++- src/client_context.cpp | 15 ++++++++++++-- src/dbms_simulator.cpp | 7 ++++++- src/mock_server_context.hpp | 6 ++++++ src/server_context.cpp | 25 +++++++++++----------- src/server_context_test.cpp | 29 +++++++++++++++++++------- src/transaction_context.cpp | 20 +++++++++++++++--- src/transaction_context_test.cpp | 24 ++++++++++++++++++++- src/wsrep_provider_v26.cpp | 30 ++++++++++++++++++--------- 12 files changed, 137 insertions(+), 50 deletions(-) diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index de1a7b9..c598a6e 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -210,6 +210,7 @@ namespace wsrep assert(state_ == s_exec); return transaction_.start_transaction(); } + int start_transaction(const wsrep::transaction_id& id) { assert(state_ == s_exec); @@ -265,12 +266,12 @@ namespace wsrep } int before_rollback() { - assert(state_ == s_exec); + assert(state_ == s_idle || state_ == s_exec || state_ == s_result); return transaction_.before_rollback(); } int after_rollback() { - assert(state_ == s_exec); + assert(state_ == s_idle || state_ == s_exec || state_ == s_result); return transaction_.after_rollback(); } @@ -319,6 +320,14 @@ namespace wsrep * \return Client mode. */ enum mode mode() const { return mode_; } + /*! + * Get Client state. + * + * \todo Enforce mutex protection if called from other threads. + * + * \return Client state + */ + enum state state() const { return state_; } const wsrep::transaction_context& transaction() const { @@ -369,20 +378,14 @@ namespace wsrep * Friend declarations */ friend int server_context::on_apply(client_context&, + const wsrep::ws_handle&, + const wsrep::ws_meta&, const wsrep::data&); friend class client_context_switch; friend class client_applier_mode; friend class client_toi_mode; friend class transaction_context; - /*! - * Get Client state. - * - * \todo Enforce mutex protection if called from other threads. - * - * \return Client state - */ - enum state state() const { return state_; } /*! * Set client state. diff --git a/include/wsrep/mutex.hpp b/include/wsrep/mutex.hpp index ee5cb02..29225dc 100644 --- a/include/wsrep/mutex.hpp +++ b/include/wsrep/mutex.hpp @@ -69,6 +69,7 @@ namespace wsrep { return &mutex_; } + private: pthread_mutex_t mutex_; }; diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index 00ad608..aacd04a 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -295,7 +295,7 @@ namespace wsrep const wsrep::gtid& gtid, bool bypass) = 0; - + virtual void background_rollback(wsrep::client_context&) = 0; /*! * */ @@ -333,6 +333,8 @@ namespace wsrep * \return Zero on success, non-zero on failure. */ int on_apply(wsrep::client_context& client_context, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, const wsrep::data& data); /*! diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 98b261c..b911841 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -83,6 +83,7 @@ namespace wsrep assert(ws_meta_.transaction_id() != transaction_id::invalid()); return start_transaction(ws_meta_.transaction_id()); } + int start_transaction(const wsrep::transaction_id& id); int start_transaction(const wsrep::ws_handle& ws_handle, @@ -121,8 +122,8 @@ namespace wsrep } wsrep::mutex& mutex(); - wsrep::ws_handle& ws_handle() { return ws_handle_; } + const wsrep::ws_handle& ws_handle() const { return ws_handle_; } const wsrep::ws_meta& ws_meta() const { return ws_meta_; } private: transaction_context(const transaction_context&); diff --git a/src/client_context.cpp b/src/client_context.cpp index da6da91..943bc40 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -35,6 +35,7 @@ int wsrep::client_context::before_command() (transaction_.state() == wsrep::transaction_context::s_must_abort || transaction_.state() == wsrep::transaction_context::s_aborted)) { + override_error(wsrep::e_deadlock_error); return 1; } return 0; @@ -67,13 +68,23 @@ void wsrep::client_context::after_command_after_result() { // Note: Error is not overridden here as the result has already // been sent to client. The error should be set in before_command() - // when the client issues next command. + // when the client issues next command and finds the transaction + // in aborted state. lock.unlock(); rollback(); transaction_.after_statement(); lock.lock(); assert(transaction_.state() == wsrep::transaction_context::s_aborted); - assert(current_error() != wsrep::e_success); + assert(current_error() == wsrep::e_success); + } + else if (transaction_.active() && + transaction_.state() == wsrep::transaction_context::s_aborted) + { + // Will clean up the transaction + lock.unlock(); + (void)transaction_.after_statement(); + lock.lock(); + current_error_ = wsrep::e_success; } state(lock, s_idle); } diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index b8a4e6a..2714aa3 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -253,7 +253,12 @@ public: { simulator_.donate_sst(*this, req, gtid, bypass); } - + void background_rollback(wsrep::client_context& cs) override + { + assert(0); + cs.before_rollback(); + cs.after_rollback(); + } // Client context management wsrep::client_context* local_client_context(); diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index 9c07e5c..08e33d1 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -43,6 +43,12 @@ namespace wsrep void on_sst_request(const std::string&, const wsrep::gtid&, bool) WSREP_OVERRIDE { } + void background_rollback(wsrep::client_context& client_context) + WSREP_OVERRIDE + { + client_context.before_rollback(); + client_context.after_rollback(); + } // void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { } // void on_apply(wsrep::transaction_context&) { } // void on_commit(wsrep::transaction_context&) { } diff --git a/src/server_context.cpp b/src/server_context.cpp index d47ec77..0c7789e 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -152,15 +152,9 @@ namespace meta->stid.trx, meta->stid.conn), meta->depends_on, map_flags_from_native(flags)); - if (client_context->transaction().state() != - wsrep::transaction_context::s_replaying && - client_context->start_transaction(ws_handle, ws_meta)) - { - ret = WSREP_CB_FAILURE; - } if (ret == WSREP_CB_SUCCESS && client_context->server_context().on_apply( - *client_context, data)) + *client_context, ws_handle, ws_meta, data)) { ret = WSREP_CB_FAILURE; } @@ -337,24 +331,27 @@ void wsrep::server_context::on_sync() int wsrep::server_context::on_apply( wsrep::client_context& client_context, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, const wsrep::data& data) { int ret(0); const wsrep::transaction_context& txc(client_context.transaction()); - const wsrep::ws_meta& ws_meta(txc.ws_meta()); // wsrep::log_debug() << "server_context::on apply flags: " // << flags_to_string(ws_meta.flags()); + assert(ws_handle.opaque()); assert(ws_meta.flags()); + bool not_replaying(txc.state() != + wsrep::transaction_context::s_replaying); + if (starts_transaction(ws_meta) && commits_transaction(ws_meta)) { - bool not_replaying(txc.state() != - wsrep::transaction_context::s_replaying); if (not_replaying) { client_context.before_command(); client_context.before_statement(); assert(txc.active() == false); - client_context.start_transaction(); + client_context.start_transaction(ws_handle, ws_meta); } if (client_context.apply(data)) { @@ -384,8 +381,10 @@ int wsrep::server_context::on_apply( // SR not implemented yet assert(0); } - - assert(txc.active() == false); + if (not_replaying) + { + assert(txc.active() == false); + } return ret; } diff --git a/src/server_context_test.cpp b/src/server_context_test.cpp index 7aa9f2b..f8f0edf 100644 --- a/src/server_context_test.cpp +++ b/src/server_context_test.cpp @@ -18,14 +18,23 @@ namespace wsrep::client_id(1), wsrep::client_context::m_applier, false) + , ws_handle(1, (void*)1) + , ws_meta(wsrep::gtid(wsrep::id("1"), 1), + wsrep::stid(wsrep::id("1"), 1, 1), + 0, + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit) { - wsrep_mock::start_applying_transaction( - cc, 1, 1, - wsrep::provider::flag::start_transaction | - wsrep::provider::flag::commit); + // wsrep_mock::start_applying_transaction( + // cc, 1, 1, + // wsrep::provider::flag::start_transaction | + // wsrep::provider::flag::commit); + cc.start_transaction(ws_handle, ws_meta); } wsrep::mock_server_context sc; wsrep::mock_client_context cc; + wsrep::ws_handle ws_handle; + wsrep::ws_meta ws_meta; }; } @@ -35,7 +44,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc, { cc.debug_log_level(1); char buf[1] = { 1 }; - BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::data(buf, 1)) == 0); const wsrep::transaction_context& txc(cc.transaction()); // ::abort(); BOOST_REQUIRE_MESSAGE( @@ -48,7 +58,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc, applying_server_fixture) { char buf[1] = { 1 }; - BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::data(buf, 1)) == 0); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed); } @@ -60,7 +71,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback, { cc.fail_next_applying(true); char buf[1] = { 1 }; - BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::data(buf, 1)) == 1); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); } @@ -72,7 +84,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback, { cc.fail_next_applying(true); char buf[1] = { 1 }; - BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1); + BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, + wsrep::data(buf, 1)) == 1); const wsrep::transaction_context& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index f92087c..1234cee 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -336,6 +336,7 @@ int wsrep::transaction_context::before_rollback() debug_log_state("before_rollback_enter"); assert(state() == s_executing || state() == s_must_abort || + state() == s_aborting || // Background rollbacker state() == s_cert_failed || state() == s_must_replay); @@ -373,6 +374,12 @@ int wsrep::transaction_context::before_rollback() } state(lock, s_aborting); break; + case s_aborting: + if (is_streaming()) + { + provider_.rollback(id_.get()); + } + break; case s_must_replay: break; default: @@ -538,11 +545,18 @@ bool wsrep::transaction_context::bf_abort( if (ret) { bf_abort_client_state_ = client_context_.state(); - if (client_context_.server_context().rollback_mode() == + if (client_context_.state() == wsrep::client_context::s_idle && + client_context_.server_context().rollback_mode() == wsrep::server_context::rm_sync) { - //! \todo Launch background rollbacker. - assert(0); + // We need to change the state to aborting under the + // lock protection to avoid a race between client thread, + // otherwise it could happend that the client gains control + // between releasing the lock and before background + // rollbacker gets control. + state(lock, wsrep::transaction_context::s_aborting); + lock.unlock(); + client_context_.server_context().background_rollback(client_context_); } } return ret; diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index 50008dd..20e390b 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -17,7 +17,7 @@ namespace struct replicating_client_fixture { replicating_client_fixture() - : sc("s1", "s1", wsrep::server_context::rm_async) + : sc("s1", "s1", wsrep::server_context::rm_sync) , cc(sc, wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) @@ -361,6 +361,28 @@ BOOST_FIXTURE_TEST_CASE( BOOST_REQUIRE(cc.current_error() == wsrep::e_success); } +BOOST_FIXTURE_TEST_CASE(transaction_context_1pc_bf_abort_idle_client, + replicating_client_fixture) +{ + cc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + cc.after_statement(); + BOOST_REQUIRE(cc.state() == wsrep::client_context::s_exec); + cc.after_command_before_result(); + BOOST_REQUIRE(cc.state() == wsrep::client_context::s_result); + cc.after_command_after_result(); + BOOST_REQUIRE(cc.state() == wsrep::client_context::s_idle); + wsrep_mock::bf_abort_unordered(cc); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_aborted); + BOOST_REQUIRE(cc.before_command() == 1); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + cc.after_command_before_result(); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + cc.after_command_after_result(); + BOOST_REQUIRE(cc.current_error() == wsrep::e_success); + BOOST_REQUIRE(tc.active() == false); +} + BOOST_FIXTURE_TEST_CASE(transaction_context_1pc_applying, applying_client_fixture) { diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 8eef209..368e568 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -59,6 +59,15 @@ namespace throw wsrep::runtime_error("Invalid key type"); } + static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno) + { + return (seqno.nil() ? WSREP_SEQNO_UNDEFINED : seqno.get()); + } + + static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno) + { + return (seqno == WSREP_SEQNO_UNDEFINED ? 0 : seqno); + } inline uint32_t map_one(const int flags, const int from, const uint32_t to) { @@ -138,7 +147,7 @@ namespace mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags) : ws_meta_(ws_meta) , trx_meta_() - , flags_(map_flags_to_native(flags)) + , flags_(flags) { } ~mutable_ws_meta() @@ -147,12 +156,12 @@ namespace wsrep::gtid( wsrep::id(trx_meta_.gtid.uuid.data, sizeof(trx_meta_.gtid.uuid.data)), - trx_meta_.gtid.seqno), + seqno_from_native(trx_meta_.gtid.seqno)), wsrep::stid(wsrep::id(trx_meta_.stid.node.data, sizeof(trx_meta_.stid.node.data)), trx_meta_.stid.trx, trx_meta_.stid.conn), - trx_meta_.depends_on, flags_); + seqno_from_native(trx_meta_.depends_on), flags_); } wsrep_trx_meta* native() { return &trx_meta_; } @@ -172,12 +181,12 @@ namespace { std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(), sizeof(trx_meta_.gtid.uuid.data)); - trx_meta_.gtid.seqno = ws_meta.seqno().get(); + trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno()); std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(), sizeof(trx_meta_.stid.node.data)); trx_meta_.stid.conn = ws_meta.client_id().get(); trx_meta_.stid.trx = ws_meta.transaction_id().get(); - trx_meta_.depends_on = ws_meta.depends_on().get(); + trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on()); } ~const_ws_meta() @@ -286,9 +295,9 @@ int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle, enum wsrep::provider::status wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id, - wsrep::ws_handle& ws_handle, - int flags, - wsrep::ws_meta& ws_meta) + wsrep::ws_handle& ws_handle, + int flags, + wsrep::ws_meta& ws_meta) { mutable_ws_handle mwsh(ws_handle); mutable_ws_meta mmeta(ws_meta, flags); @@ -307,8 +316,9 @@ wsrep::wsrep_provider_v26::bf_abort( wsrep_seqno_t wsrep_victim_seqno; wsrep_status_t ret( wsrep_->abort_certification( - wsrep_, bf_seqno.get(), victim_id.get(), &wsrep_victim_seqno)); - victim_seqno = wsrep_victim_seqno; + wsrep_, seqno_to_native(bf_seqno), + victim_id.get(), &wsrep_victim_seqno)); + victim_seqno = seqno_from_native(wsrep_victim_seqno); return map_return_value(ret); }