From 7c424d833757007821eefa1d6a5e6689d411d521 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sun, 8 Jul 2018 15:27:49 +0300 Subject: [PATCH] Fixes to local streaming replication processing. --- dbsim/db_storage_service.hpp | 2 +- include/wsrep/client_state.hpp | 7 +- include/wsrep/storage_service.hpp | 5 +- include/wsrep/streaming_context.hpp | 3 +- src/server_state.cpp | 15 ++++ src/transaction.cpp | 122 +++++++++++++++++----------- test/mock_high_priority_service.cpp | 2 +- test/mock_storage_service.cpp | 29 +++---- test/mock_storage_service.hpp | 2 +- test/server_context_test.cpp | 4 +- test/transaction_test.cpp | 4 +- test/transaction_test_2pc.cpp | 2 +- 12 files changed, 122 insertions(+), 75 deletions(-) diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index 28eb78e..48a23a6 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -12,7 +12,7 @@ namespace db { class storage_service : public wsrep::storage_service { - int start_transaction() override + int start_transaction(const wsrep::ws_handle&) override { throw wsrep::not_implemented_error(); } int append_fragment(const wsrep::id&, wsrep::transaction_id, diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 916ce19..6238ace 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -307,6 +307,12 @@ namespace wsrep fragment_unit, size_t fragment_size); + void disable_streaming() + { + assert(state_ == s_exec && mode_ == m_local); + transaction_.streaming_context_.disable(); + } + /** * Prepare write set meta data for fragment storage ordering. * This method should be called from storage service commit @@ -316,7 +322,6 @@ namespace wsrep const wsrep::ws_meta& ws_meta, bool is_commit) { - assert(mode_ == m_high_priority); assert(state_ == s_exec); return transaction_.prepare_for_fragment_ordering( ws_handle, ws_meta, is_commit); diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index e8f5f24..6fa2737 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -35,12 +35,11 @@ namespace wsrep /** * Start a new transaction for storage access. * - * @param ws_hande Write set handle - * @param ws_meta Write set meta data + * @param[out] ws_handle Write set handle for a new transaction * * @return Zero in case of success, non-zero on error. */ - virtual int start_transaction() = 0; + virtual int start_transaction(const wsrep::ws_handle&) = 0; /** * Append fragment into stable storage. diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 6be91c2..ef51a28 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -46,9 +46,10 @@ namespace wsrep fragment_size_ = 0; } - void certified(wsrep::seqno seqno) + void certified(wsrep::seqno seqno, size_t bytes) { fragments_.push_back(seqno); + bytes_certified_ += bytes; } size_t fragments_certified() const diff --git a/src/server_state.cpp b/src/server_state.cpp index 9b0efc0..d5c5d55 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -463,6 +463,20 @@ void wsrep::server_state::on_view(const wsrep::view& view) if (view.status() == wsrep::view::primary) { wsrep::unique_lock lock(mutex_); + if (view.own_index() >= 0) + { + if (id_.is_undefined()) + { + // No identifier was passed during server state initialization + // and the ID was generated by the provider. + id_ = view.members()[view.own_index()].id(); + } + else + { + // Own identifier must not change between views. + // assert(id_ == view.members()[view.own_index()].id()); + } + } assert(view.final() == false); // @@ -555,6 +569,7 @@ void wsrep::server_state::on_view(const wsrep::view& view) assert(view.final()); wsrep::unique_lock lock(mutex_); state(lock, s_disconnected); + id_ = wsrep::id::undefined(); } } diff --git a/src/transaction.cpp b/src/transaction.cpp index 464face..0c7f337 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -64,9 +64,8 @@ namespace ~scoped_storage_service() { - storage_service_->reset_globals(); - client_service_.store_globals(); deleter_(storage_service_); + client_service_.store_globals(); } private: scoped_storage_service(const scoped_storage_service&); @@ -666,8 +665,15 @@ int wsrep::transaction::after_statement() void wsrep::transaction::after_applying() { wsrep::unique_lock lock(client_state_.mutex_); - assert(state_ == s_committed || state_ == s_aborted); - cleanup(); + debug_log_state("after_applying enter"); + assert(state_ == s_executing || + state_ == s_committed || + state_ == s_aborted); + if (state_ != s_executing) + { + cleanup(); + } + debug_log_state("after_applying leave"); } bool wsrep::transaction::bf_abort( @@ -820,60 +826,80 @@ int wsrep::transaction::certify_fragment( return 1; } - // The rest of this method will be executed in storage service - // scope. - scoped_storage_service - sr_scope( - client_service_, - server_service_.storage_service(client_service_), - storage_service_deleter(server_service_)); - wsrep::storage_service& storage_service( - sr_scope.storage_service()); - - // First the fragment is appended to the stable storage. - // This is done to ensure that there is enough capacity - // available to store the fragment. The fragment meta data - // is updated after certification. - if (storage_service.start_transaction()) + if (provider().append_data(ws_handle_, + wsrep::const_buffer(data.data(), data.size()))) { lock.lock(); state(lock, s_must_abort); - client_state_.override_error(wsrep::e_append_fragment_error); return 1; } - if (storage_service.append_fragment( - client_state_.server_state().id(), - id(), - flags_, - wsrep::const_buffer(data.data(), data.size()))) - { - lock.lock(); - state(lock, s_must_abort); - client_state_.override_error(wsrep::e_append_fragment_error); - return 1; - } - - enum wsrep::provider::status - cert_ret(provider().certify(client_state_.id().get(), - ws_handle_, - flags_, - ws_meta_)); - int ret(0); - switch (cert_ret) + // Storage service scope { - case wsrep::provider::success: - streaming_context_.certified(ws_meta_.seqno()); - if (storage_service.commit(ws_handle_, ws_meta_)) + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + + // First the fragment is appended to the stable storage. + // This is done to ensure that there is enough capacity + // available to store the fragment. The fragment meta data + // is updated after certification. + if (storage_service.start_transaction(ws_handle_)) { - ret = 1; + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_append_fragment_error); + return 1; } - break; - default: - storage_service.rollback(ws_handle_, ws_meta_); - ret = 1; - break; + + wsrep::id server_id(client_state_.server_state().id()); + assert(server_id.is_undefined() == false); + if (storage_service.append_fragment( + server_id, + id(), + flags_, + wsrep::const_buffer(data.data(), data.size()))) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_append_fragment_error); + return 1; + } + + wsrep::ws_meta sr_ws_meta; + enum wsrep::provider::status + cert_ret(provider().certify(client_state_.id().get(), + ws_handle_, + flags_, + sr_ws_meta)); + switch (cert_ret) + { + case wsrep::provider::success: + assert(sr_ws_meta.seqno().is_undefined() == false); + streaming_context_.certified(sr_ws_meta.seqno(), data.size()); + if (storage_service.update_fragment_meta(sr_ws_meta)) + { + ret = 1; + break; + } + wsrep::log_info() << "Committing " + << sr_ws_meta.transaction_id().get(); + if (storage_service.commit(ws_handle_, sr_ws_meta)) + { + ret = 1; + } + break; + default: + storage_service.rollback(ws_handle_, sr_ws_meta); + ret = 1; + break; + } + provider().release(ws_handle_); } lock.lock(); if (ret) diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 6bd1e32..14bf95d 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -64,5 +64,5 @@ int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, void wsrep::mock_high_priority_service::after_apply() { - client_state_->after_statement(); + client_state_->after_applying(); } diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp index 9ae0899..a2325c0 100644 --- a/test/mock_storage_service.cpp +++ b/test/mock_storage_service.cpp @@ -28,29 +28,30 @@ wsrep::mock_storage_service::~mock_storage_service() client_state_.cleanup(); } -int wsrep::mock_storage_service::start_transaction() +int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_handle) { - return client_state_.start_transaction( - server_state_.next_transaction_id()); + return client_state_.start_transaction(ws_handle.transaction_id()); } int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - return client_state_.prepare_for_fragment_ordering( - ws_handle, ws_meta, true) || - client_state_.before_commit() || - client_state_.ordered_commit() || - client_state_.after_commit() || - client_state_.after_statement(); + int ret(client_state_.prepare_for_fragment_ordering( + ws_handle, ws_meta, true) || + client_state_.before_commit() || + client_state_.ordered_commit() || + client_state_.after_commit()); + client_state_.after_applying(); + return ret; } int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - return client_state_.prepare_for_fragment_ordering( - ws_handle, ws_meta, false) || - client_state_.before_rollback() || - client_state_.after_rollback() || - client_state_.after_statement(); + int ret(client_state_.prepare_for_fragment_ordering( + ws_handle, ws_meta, false) || + client_state_.before_rollback() || + client_state_.after_rollback()); + client_state_.after_applying(); + return ret; } diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index 2270db4..feba800 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -18,7 +18,7 @@ class mock_server_state; wsrep::client_id); ~mock_storage_service(); - int start_transaction() WSREP_OVERRIDE; + int start_transaction(const wsrep::ws_handle&) WSREP_OVERRIDE; int append_fragment(const wsrep::id&, wsrep::transaction_id, diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index b544b3b..fb959e3 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -172,7 +172,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap, wsrep::id cluster_id("1"); wsrep::gtid state_id(cluster_id, wsrep::seqno(0)); std::vector members; - members.push_back(wsrep::view::member(wsrep::id("1"), "name", "")); + members.push_back(wsrep::view::member(wsrep::id("s1"), "name", "")); wsrep::view bootstrap_view(state_id, wsrep::seqno(1), wsrep::view::primary, @@ -198,7 +198,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap, wsrep::id cluster_id("1"); wsrep::gtid state_id(cluster_id, wsrep::seqno(0)); std::vector members; - members.push_back(wsrep::view::member(wsrep::id("1"), "name", "")); + members.push_back(wsrep::view::member(wsrep::id("s1"), "name", "")); wsrep::view bootstrap_view(state_id, wsrep::seqno(1), wsrep::view::primary, diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index f658418..4fbf8df 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -950,7 +950,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_1pc_applying, BOOST_REQUIRE(tc.state() == wsrep::transaction::s_ordered_commit); BOOST_REQUIRE(cc.after_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); - cc.after_statement(); + cc.after_applying(); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success); @@ -964,7 +964,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_applying_rollback, BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborting); BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); - cc.after_statement(); + cc.after_applying(); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success); diff --git a/test/transaction_test_2pc.cpp b/test/transaction_test_2pc.cpp index b15ab59..8ada781 100644 --- a/test/transaction_test_2pc.cpp +++ b/test/transaction_test_2pc.cpp @@ -239,7 +239,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_2pc_applying, BOOST_REQUIRE(tc.state() == wsrep::transaction::s_ordered_commit); BOOST_REQUIRE(cc.after_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); - BOOST_REQUIRE(cc.after_statement() == 0); + cc.after_applying(); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success);