diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 040e000..9933fe4 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -242,7 +242,7 @@ namespace wsrep } int enable_streaming( - enum wsrep::transaction_context::streaming_context::fragment_unit + enum wsrep::streaming_context::fragment_unit fragment_unit, size_t fragment_size) { diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp new file mode 100644 index 0000000..bbf0c9c --- /dev/null +++ b/include/wsrep/streaming_context.hpp @@ -0,0 +1,97 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_STREAMING_CONTEXT_HPP +#define WSREP_STREAMING_CONTEXT_HPP + +namespace wsrep +{ + class streaming_context + { + public: + enum fragment_unit + { + row, + bytes, + statement + }; + + streaming_context() + : fragments_() + , rollback_replicated_for_() + , fragment_unit_() + , fragment_size_() + , bytes_certified_() + , unit_counter_() + { } + + void enable(enum fragment_unit fragment_unit, size_t fragment_size) + { + assert(fragment_size > 0); + fragment_unit_ = fragment_unit; + fragment_size_ = fragment_size; + } + + enum fragment_unit fragment_unit() const { return fragment_unit_; } + + size_t fragment_size() const { return fragment_size_; } + + void disable() + { + fragment_size_ = 0; + } + + void certified(wsrep::seqno seqno) + { + fragments_.push_back(seqno); + } + + size_t fragments_certified() const + { + return fragments_.size(); + } + + size_t bytes_certified() const + { + return bytes_certified_; + } + + void rolled_back(wsrep::transaction_id id) + { + assert(rollback_replicated_for_ == wsrep::transaction_id::invalid()); + rollback_replicated_for_ = id; + } + + bool rolled_back() const + { + return (rollback_replicated_for_ != + wsrep::transaction_id::invalid()); + } + + size_t unit_counter() const { return unit_counter_; } + void increment_unit_counter(size_t inc) + { unit_counter_ += inc; } + void reset_unit_counter() { unit_counter_ = 0; } + const std::vector& fragments() const + { + return fragments_; + } + void cleanup() + { + fragments_.clear(); + rollback_replicated_for_ = wsrep::transaction_id::invalid(); + bytes_certified_ = 0; + unit_counter_ = 0; + } + private: + std::vector fragments_; + wsrep::transaction_id rollback_replicated_for_; + enum fragment_unit fragment_unit_; + size_t fragment_size_; + size_t bytes_certified_; + size_t unit_counter_; + }; +} + +#endif // WSREP_STREAMING_CONTEXT_HPP diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 43a9795..595ec0f 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -8,6 +8,7 @@ #include "provider.hpp" #include "server_context.hpp" #include "transaction_id.hpp" +#include "streaming_context.hpp" #include "lock.hpp" #include @@ -154,89 +155,8 @@ namespace wsrep int flags_; bool pa_unsafe_; bool certified_; - public: - class streaming_context - { - public: - enum fragment_unit - { - row, - bytes, - statement - }; - - streaming_context() - : fragments_() - , rollback_replicated_for_() - , fragment_unit_() - , fragment_size_() - , bytes_certified_() - , unit_counter_() - { } - - void enable(enum fragment_unit fragment_unit, size_t fragment_size) - { - assert(fragment_size > 0); - fragment_unit_ = fragment_unit; - fragment_size_ = fragment_size; - } - - enum fragment_unit fragment_unit() const { return fragment_unit_; } - - size_t fragment_size() const { return fragment_size_; } - - void disable() - { - fragment_size_ = 0; - } - - void certified(wsrep::seqno seqno) - { - fragments_.push_back(seqno); - } - - size_t fragments_certified() const - { - return fragments_.size(); - } - - size_t bytes_certified() const - { - return bytes_certified_; - } - - void rolled_back(wsrep::transaction_id id) - { - assert(rollback_replicated_for_ == wsrep::transaction_id::invalid()); - rollback_replicated_for_ = id; - } - - bool rolled_back() const - { - return (rollback_replicated_for_ != - wsrep::transaction_id::invalid()); - } - - size_t unit_counter() const { return unit_counter_; } - void increment_unit_counter(size_t inc) - { unit_counter_ += inc; } - void reset_unit_counter() { unit_counter_ = 0; } - void cleanup() - { - fragments_.clear(); - rollback_replicated_for_ = wsrep::transaction_id::invalid(); - bytes_certified_ = 0; - unit_counter_ = 0; - } - private: - std::vector fragments_; - wsrep::transaction_id rollback_replicated_for_; - enum fragment_unit fragment_unit_; - size_t fragment_size_; - size_t bytes_certified_; - size_t unit_counter_; - } streaming_context_; + wsrep::streaming_context streaming_context_; }; static inline std::string to_string(enum wsrep::transaction_context::state state) diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 26c2c7e..09b8cdf 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -764,7 +764,6 @@ int wsrep::transaction_context::certify_commit( { assert(lock.owns_lock()); assert(active()); - client_context_.wait_for_replayers(lock); assert(lock.owns_lock()); diff --git a/test/client_context_fixture.hpp b/test/client_context_fixture.hpp index e032557..31871a9 100644 --- a/test/client_context_fixture.hpp +++ b/test/client_context_fixture.hpp @@ -162,7 +162,7 @@ namespace // Verify initial state BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1); + cc.enable_streaming(wsrep::streaming_context::row, 1); } wsrep::fake_server_context sc; wsrep::fake_client_context cc; @@ -182,7 +182,7 @@ namespace // Verify initial state BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.enable_streaming(wsrep::transaction_context::streaming_context::bytes, 1); + cc.enable_streaming(wsrep::streaming_context::bytes, 1); } wsrep::fake_server_context sc; wsrep::fake_client_context cc; @@ -202,7 +202,7 @@ namespace // Verify initial state BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1); + cc.enable_streaming(wsrep::streaming_context::row, 1); } wsrep::fake_server_context sc; diff --git a/test/transaction_context_test.cpp b/test/transaction_context_test.cpp index 0ca6456..c70c30b 100644 --- a/test/transaction_context_test.cpp +++ b/test/transaction_context_test.cpp @@ -924,7 +924,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_batch_streaming_1pc_commit, streaming_client_fixture_row) { BOOST_REQUIRE(cc.enable_streaming( - wsrep::transaction_context::streaming_context::row, 2) == 0); + wsrep::streaming_context::row, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); @@ -1067,7 +1067,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit, { BOOST_REQUIRE( cc.enable_streaming( - wsrep::transaction_context::streaming_context::bytes, 2) == 0); + wsrep::streaming_context::bytes, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); @@ -1090,7 +1090,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_1pc_commit, { BOOST_REQUIRE( cc.enable_streaming( - wsrep::transaction_context::streaming_context::statement, 1) == 0); + wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); @@ -1111,7 +1111,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_statement_batch_streaming_1pc_commit { BOOST_REQUIRE( cc.enable_streaming( - wsrep::transaction_context::streaming_context::statement, 2) == 0); + wsrep::streaming_context::statement, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); @@ -1137,7 +1137,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_cert_fail, { BOOST_REQUIRE( cc.enable_streaming( - wsrep::transaction_context::streaming_context::statement, 1) == 0); + wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);