From 4c1ea388f85e9d167366c505fb0a1bdf87d49b97 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Wed, 13 Jun 2018 15:34:49 +0300 Subject: [PATCH] Call before_prepare()/after_prepare() for applier in before_commit() if no 2PC is required. --- src/transaction_context.cpp | 34 ++++++++++++++----- test/client_context_fixture.hpp | 49 +++++++++++++++++++++++++++ test/transaction_context_test_2pc.cpp | 14 ++++---- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index d99aabb..51cf75a 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -132,7 +132,8 @@ int wsrep::transaction_context::before_prepare( assert(lock.owns_lock()); int ret(0); debug_log_state("before_prepare_enter"); - assert(state() == s_executing || state() == s_must_abort); + assert(state() == s_executing || state() == s_must_abort || + state() == s_replaying); if (state() == s_must_abort) { @@ -141,7 +142,10 @@ int wsrep::transaction_context::before_prepare( return 1; } - state(lock, s_preparing); + if (state() != s_replaying) + { + state(lock, s_preparing); + } switch (client_context_.mode()) { @@ -168,14 +172,17 @@ int wsrep::transaction_context::before_prepare( break; case wsrep::client_context::m_local: case wsrep::client_context::m_applier: - client_context_.remove_fragments(*this); + if (is_streaming()) + { + client_context_.remove_fragments(*this); + } break; default: assert(0); break; } - assert(state() == s_preparing); + assert(state() == s_preparing || state() == s_replaying); debug_log_state("before_prepare_leave"); return ret; } @@ -186,7 +193,8 @@ int wsrep::transaction_context::after_prepare( assert(lock.owns_lock()); int ret(1); debug_log_state("after_prepare_enter"); - assert(state() == s_preparing || state() == s_must_abort); + assert(state() == s_preparing || state() == s_must_abort || + state() == s_replaying); if (state() == s_must_abort) { assert(client_context_.mode() == wsrep::client_context::m_replicating); @@ -205,8 +213,11 @@ int wsrep::transaction_context::after_prepare( break; case wsrep::client_context::m_local: case wsrep::client_context::m_applier: - state(lock, s_certifying); - state(lock, s_committing); + if (state() != s_replaying) + { + state(lock, s_certifying); + state(lock, s_committing); + } ret = 0; break; default: @@ -241,6 +252,7 @@ int wsrep::transaction_context::before_commit() case wsrep::client_context::m_replicating: if (state() == s_executing) { + assert(client_context_.do_2pc() == false); ret = before_prepare(lock) || after_prepare(lock); assert((ret == 0 && state() == s_committing) || @@ -297,9 +309,13 @@ int wsrep::transaction_context::before_commit() assert(ordered()); if (client_context_.do_2pc() == false) { - client_context_.remove_fragments(*this); + ret = before_prepare(lock) || after_prepare(lock); } - ret = provider_.commit_order_enter(ws_handle_, ws_meta_); + else + { + ret = 0; + } + ret = ret || provider_.commit_order_enter(ws_handle_, ws_meta_); if (ret) { state(lock, s_must_abort); diff --git a/test/client_context_fixture.hpp b/test/client_context_fixture.hpp index 91b95e5..3d2244d 100644 --- a/test/client_context_fixture.hpp +++ b/test/client_context_fixture.hpp @@ -51,6 +51,25 @@ namespace const wsrep::transaction_context& tc; }; + struct replicating_client_fixture_2pc + { + replicating_client_fixture_2pc() + : sc("s1", "s1", wsrep::server_context::rm_sync) + , cc(sc, wsrep::client_id(1), + wsrep::client_context::m_replicating, false, true) + , tc(cc.transaction()) + { + BOOST_REQUIRE(cc.before_command() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); + } + wsrep::fake_server_context sc; + wsrep::fake_client_context cc; + const wsrep::transaction_context& tc; + }; + struct replicating_client_fixture_autocommit { replicating_client_fixture_autocommit() @@ -100,6 +119,36 @@ namespace const wsrep::transaction_context& tc; }; + struct applying_client_fixture_2pc + { + applying_client_fixture_2pc() + : sc("s1", "s1", + wsrep::server_context::rm_async) + , cc(sc, + wsrep::client_id(1), + wsrep::client_context::m_applier, false, true) + , tc(cc.transaction()) + { + BOOST_REQUIRE(cc.before_command() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + wsrep::ws_handle ws_handle(1, (void*)1); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), + wsrep::stid(sc.id(), 1, cc.id()), + wsrep::seqno(0), + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit); + BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(cc.start_transaction() == 0); + BOOST_REQUIRE(tc.active() == true); + BOOST_REQUIRE(tc.certified() == true); + BOOST_REQUIRE(tc.ordered() == true); + } + wsrep::fake_server_context sc; + wsrep::fake_client_context cc; + const wsrep::transaction_context& tc; + }; + struct streaming_client_fixture_row { streaming_client_fixture_row() diff --git a/test/transaction_context_test_2pc.cpp b/test/transaction_context_test_2pc.cpp index ce81f78..735e86f 100644 --- a/test/transaction_context_test_2pc.cpp +++ b/test/transaction_context_test_2pc.cpp @@ -8,7 +8,7 @@ // Test a succesful 2PC transaction lifecycle // BOOST_FIXTURE_TEST_CASE(transaction_context_2pc, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -36,7 +36,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_2pc, // BOOST_FIXTURE_TEST_CASE( transaction_context_2pc_bf_before_before_prepare, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -63,7 +63,7 @@ BOOST_FIXTURE_TEST_CASE( // BOOST_FIXTURE_TEST_CASE( transaction_context_2pc_bf_before_after_prepare, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -93,7 +93,7 @@ BOOST_FIXTURE_TEST_CASE( // BOOST_FIXTURE_TEST_CASE( transaction_context_2pc_bf_after_after_prepare, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -121,7 +121,7 @@ BOOST_FIXTURE_TEST_CASE( // BOOST_FIXTURE_TEST_CASE( transaction_context_2pc_bf_before_before_commit, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE( // BOOST_FIXTURE_TEST_CASE( transaction_context_2pc_bf_during_commit_order_enter, - replicating_client_fixture_sync_rm) + replicating_client_fixture_2pc) { cc.start_transaction(1); BOOST_REQUIRE(tc.active()); @@ -227,7 +227,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit_two_statements, /////////////////////////////////////////////////////////////////////////////// BOOST_FIXTURE_TEST_CASE(transaction_context_2pc_applying, - applying_client_fixture) + applying_client_fixture_2pc) { BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_preparing);