From 265d9b3322ad4fd9a219fb607ac3b3e6b0188815 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Wed, 13 Jun 2018 10:18:46 +0300 Subject: [PATCH] Unit tests for SR with two statements, SR rollback. --- dbsim/dbms_simulator.cpp | 5 +++ include/wsrep/client_context.hpp | 6 +++ include/wsrep/exception.hpp | 3 ++ include/wsrep/server_context.hpp | 23 +++++++++++ include/wsrep/transaction_context.hpp | 1 + src/server_context.cpp | 16 ++++++++ src/transaction_context.cpp | 25 ++++++------ test/mock_provider.hpp | 11 ++++-- test/mock_server_context.hpp | 9 ++++- test/transaction_context_test.cpp | 57 ++++++++++++++++++++++++++- 10 files changed, 140 insertions(+), 16 deletions(-) diff --git a/dbsim/dbms_simulator.cpp b/dbsim/dbms_simulator.cpp index a1f0d74..38a71c7 100644 --- a/dbsim/dbms_simulator.cpp +++ b/dbsim/dbms_simulator.cpp @@ -265,6 +265,11 @@ public: // Client context management wsrep::client_context* local_client_context(); + wsrep::client_context& streaming_applier_client_context( + const wsrep::id&, const wsrep::transaction_id&) override + { + throw wsrep::not_implemented_error(); + } size_t next_transaction_id() { return (last_transaction_id_.fetch_add(1) + 1); diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 0997a46..8dce5d3 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -229,6 +229,12 @@ namespace wsrep return transaction_.start_transaction(wsh, meta); } + void adopt_transaction(wsrep::transaction_context& transaction) + { + transaction_.start_transaction(transaction.id()); + transaction_.streaming_context_ = transaction.streaming_context_; + } + void enable_streaming( enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit, size_t fragment_size) diff --git a/include/wsrep/exception.hpp b/include/wsrep/exception.hpp index aaf90c0..9fc4fdf 100644 --- a/include/wsrep/exception.hpp +++ b/include/wsrep/exception.hpp @@ -30,6 +30,9 @@ namespace wsrep } }; + class fatal_error : public std::exception + { + }; } diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index ffc0a4c..5f49d64 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -66,6 +66,7 @@ #include #include +#include namespace wsrep { @@ -74,7 +75,9 @@ namespace wsrep class ws_meta; class provider; class client_context; + class transaction_id; class transaction_context; + class id; class gtid; class view; class const_buffer; @@ -198,6 +201,23 @@ namespace wsrep */ virtual client_context* local_client_context() = 0; + /*! + * Create applier context for streaming transaction. + * + * \param server_id Server id of the origin of the SR transaction. + * \param transaction_id Transaction ID of the SR transaction on the + * origin server. + */ + virtual client_context& streaming_applier_client_context( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id + ) = 0; + + void insert_streaming_applier( + const wsrep::id&, + const wsrep::transaction_id&, + wsrep::client_context* client_context); + /*! * Load WSRep provider. * @@ -383,6 +403,7 @@ namespace wsrep , cond_(cond) , state_(s_disconnected) , state_waiters_(n_states_) + , streaming_appliers_() , provider_() , name_(name) , id_(id) @@ -403,6 +424,8 @@ namespace wsrep wsrep::condition_variable& cond_; enum state state_; mutable std::vector state_waiters_; + typedef std::map, wsrep::client_context*> streaming_appliers_map; + streaming_appliers_map streaming_appliers_; wsrep::provider* provider_; std::string name_; std::string id_; diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 6edba09..c27aa0a 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -135,6 +135,7 @@ namespace wsrep void flags(int flags) { flags_ = flags; } int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); + void streaming_rollback(); void clear_fragments(); void cleanup(); void debug_log_state(const char*) const; diff --git a/src/server_context.cpp b/src/server_context.cpp index 7cbe0aa..c6573f2 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -8,6 +8,7 @@ #include "wsrep/view.hpp" #include "wsrep/logger.hpp" #include "wsrep/compiler.hpp" +#include "wsrep/id.hpp" #include #include @@ -178,6 +179,21 @@ bool wsrep::server_context::statement_allowed_for_streaming( return false; } +void wsrep::server_context::insert_streaming_applier( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id, + wsrep::client_context* client_context) +{ + if (streaming_appliers_.insert( + std::make_pair(std::make_pair(server_id, transaction_id), + client_context)).second == false) + { + wsrep::log_error() << "Could not insert streaming applier"; + delete client_context; + throw wsrep::fatal_error(); + } +} + // Private void wsrep::server_context::state( diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 3579a9a..e77e1c3 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -367,8 +367,7 @@ int wsrep::transaction_context::before_rollback() // Voluntary rollback if (is_streaming()) { - // Replicate rollback fragment - provider_.rollback(id_.get()); + streaming_rollback(); } state(lock, s_aborting); break; @@ -652,11 +651,6 @@ int wsrep::transaction_context::certify_fragment( lock.unlock(); - if (streaming_context_.fragments_certified()) - { - flags_ |= wsrep::provider::flag::start_transaction; - } - wsrep::mutable_buffer data; if (client_context_.prepare_fragment_for_replication(*this, data)) { @@ -869,6 +863,18 @@ int wsrep::transaction_context::certify_commit( return ret; } +void wsrep::transaction_context::streaming_rollback() +{ + assert(streaming_context_.rolled_back() == false); + wsrep::client_context& applier_context( + client_context_.server_context().streaming_applier_client_context( + client_context_.server_context().id(), id())); + applier_context.adopt_transaction(*this); + streaming_context_.cleanup(); + // Replicate rollback fragment + provider_.rollback(id_.get()); +} + void wsrep::transaction_context::clear_fragments() { streaming_context_.cleanup(); @@ -876,13 +882,10 @@ void wsrep::transaction_context::clear_fragments() void wsrep::transaction_context::cleanup() { + assert(is_streaming() == false); debug_log_state("cleanup_enter"); id_ = wsrep::transaction_id::invalid(); ws_handle_ = wsrep::ws_handle(); - if (is_streaming()) - { - state_ = s_executing; - } // Keep the state history for troubleshooting. Reset at start_transaction(). // state_hist_.clear(); ws_meta_ = wsrep::ws_meta(); diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 10656bc..1aadc45 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -53,7 +53,7 @@ namespace wsrep << "client: " << client_id.get() << " flags: " << std::hex << flags << std::dec - << "next_error: " << next_error_; + << " next_error: " << next_error_; if (next_error_) { @@ -71,6 +71,7 @@ namespace wsrep } if (rolls_back_transaction(flags)) { + assert(0); ++rollback_fragments_; } @@ -116,7 +117,11 @@ namespace wsrep int append_data(wsrep::ws_handle&, const wsrep::const_buffer&) { return 0; } int rollback(const wsrep::transaction_id) - { return next_error_; } + { + ++fragments_; + ++rollback_fragments_; + return 0; + } enum wsrep::provider::status commit_order_enter(const wsrep::ws_handle&, const wsrep::ws_meta&) @@ -170,7 +175,7 @@ namespace wsrep size_t start_fragments() const { return start_fragments_; } size_t fragments() const { return fragments_; } size_t commit_fragments() const { return commit_fragments_; } - size_t rollback_fragments() const { return commit_fragments_; } + size_t rollback_fragments() const { return rollback_fragments_; } private: wsrep::id group_id_; diff --git a/test/mock_server_context.hpp b/test/mock_server_context.hpp index 6dedfa4..6b6d462 100644 --- a/test/mock_server_context.hpp +++ b/test/mock_server_context.hpp @@ -33,7 +33,14 @@ namespace wsrep return new wsrep::mock_client_context(*this, ++last_client_id_, wsrep::client_context::m_local); } - + wsrep::client_context& streaming_applier_client_context( + const wsrep::id& server_id, + const wsrep::transaction_id& transaction_id) + { + wsrep::client_context* sac(new wsrep::mock_client_context(*this, ++last_client_id_, wsrep::client_context::m_applier)); + insert_streaming_applier(server_id, transaction_id, sac); + return *sac; + } void on_connect() WSREP_OVERRIDE { } void wait_until_connected() WSREP_OVERRIDE { } void on_view(const wsrep::view&) WSREP_OVERRIDE { } diff --git a/test/transaction_context_test.cpp b/test/transaction_context_test.cpp index 33b37e4..83445dd 100644 --- a/test/transaction_context_test.cpp +++ b/test/transaction_context_test.cpp @@ -1202,7 +1202,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit, BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().commit_fragments() == 1); - } BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit, @@ -1222,3 +1221,59 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit, BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } + + +BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit_two_statements, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 3); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); + +} + +BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit_two_statements, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2); + BOOST_REQUIRE(cc.before_prepare() == 0); + BOOST_REQUIRE(cc.after_prepare() == 0); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 3); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} + +BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_rollback, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_rollback() == 0); + BOOST_REQUIRE(cc.after_rollback() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().rollback_fragments() == 1); +}