diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 6238ace..f7b3fcd 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -286,7 +286,7 @@ namespace wsrep { assert(mode_ == m_local); assert(state_ == s_exec); - return (transaction_.streaming_context_.fragment_size() ? + return (transaction_.streaming_context().fragment_size() ? transaction_.after_row() : 0); } @@ -307,11 +307,10 @@ namespace wsrep fragment_unit, size_t fragment_size); - void disable_streaming() - { - assert(state_ == s_exec && mode_ == m_local); - transaction_.streaming_context_.disable(); - } + /** + * Disable streaming for context. + */ + void disable_streaming(); /** * Prepare write set meta data for fragment storage ordering. @@ -426,7 +425,7 @@ namespace wsrep { assert(mode_ == m_high_priority); transaction_.start_transaction(transaction.id()); - transaction_.streaming_context_ = transaction.streaming_context_; + transaction_.streaming_context() = transaction.streaming_context(); } /** @name Non-transactional operations */ diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 8eaeca5..7af1ac2 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -135,6 +135,10 @@ namespace wsrep wsrep::ws_handle& ws_handle() { return ws_handle_; } const wsrep::ws_handle& ws_handle() const { return ws_handle_; } const wsrep::ws_meta& ws_meta() const { return ws_meta_; } + const wsrep::streaming_context& streaming_context() const + { return streaming_context_; } + wsrep::streaming_context& streaming_context() + { return streaming_context_; } private: transaction(const transaction&); transaction operator=(const transaction&); @@ -162,7 +166,6 @@ namespace wsrep int flags_; bool pa_unsafe_; bool certified_; - public: wsrep::streaming_context streaming_context_; }; diff --git a/src/client_state.cpp b/src/client_state.cpp index 15ea948..e051a6a 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -218,26 +218,40 @@ int wsrep::client_state::after_statement() return 0; } +////////////////////////////////////////////////////////////////////////////// +// Streaming // +////////////////////////////////////////////////////////////////////////////// + int wsrep::client_state::enable_streaming( enum wsrep::streaming_context::fragment_unit fragment_unit, size_t fragment_size) { assert(mode_ == m_local); - if (transaction_.active() && - transaction_.streaming_context_.fragment_unit() != + if (transaction_.streaming_context().fragments_certified() && + transaction_.streaming_context().fragment_unit() != fragment_unit) { wsrep::log_error() - << "Changing fragment unit for active transaction " + << "Changing fragment unit for active streaming transaction " << "not allowed"; return 1; } - transaction_.streaming_context_.enable( - fragment_unit, fragment_size); + transaction_.streaming_context().enable(fragment_unit, fragment_size); return 0; } +void wsrep::client_state::disable_streaming() +{ + assert(state_ == s_exec && mode_ == m_local); + transaction_.streaming_context().disable(); +} + +////////////////////////////////////////////////////////////////////////////// +// TOI // +////////////////////////////////////////////////////////////////////////////// + + int wsrep::client_state::enter_toi(const wsrep::key_array& keys, const wsrep::const_buffer& buffer, int flags) @@ -304,6 +318,10 @@ int wsrep::client_state::leave_toi() return ret; } +/////////////////////////////////////////////////////////////////////////////// +// RSU // +/////////////////////////////////////////////////////////////////////////////// + int wsrep::client_state::begin_rsu(int timeout) { if (server_state_.desync()) @@ -349,6 +367,10 @@ int wsrep::client_state::end_rsu() return ret; } +/////////////////////////////////////////////////////////////////////////////// +// TOI // +/////////////////////////////////////////////////////////////////////////////// + int wsrep::client_state::sync_wait(int timeout) { std::pair result( @@ -370,7 +392,9 @@ int wsrep::client_state::sync_wait(int timeout) return ret; } -// Private +/////////////////////////////////////////////////////////////////////////////// +// Private // +/////////////////////////////////////////////////////////////////////////////// void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) { diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 4fbf8df..19fcb22 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -982,7 +982,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_1pc_commit, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0); @@ -1002,9 +1002,9 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_batch_streaming_1pc_commit, wsrep::streaming_context::row, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0); @@ -1023,11 +1023,11 @@ BOOST_FIXTURE_TEST_CASE( { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.after_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 2); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0); @@ -1045,7 +1045,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_rollback, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_rollback() == 0); BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(cc.after_statement() == 0); @@ -1062,7 +1062,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_non_commit, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); sc.provider().certify_result_ = wsrep::provider::error_certification_failed; BOOST_REQUIRE(cc.after_row() == 1); sc.provider().certify_result_ = wsrep::provider::success; @@ -1082,7 +1082,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_commit, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); sc.provider().certify_result_ = wsrep::provider::error_certification_failed; BOOST_REQUIRE(cc.before_commit() == 1); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_cert_failed); @@ -1104,7 +1104,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); wsrep_test::bf_abort_ordered(cc); @@ -1127,7 +1127,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_streaming_1pc_commit, BOOST_REQUIRE(cc.start_transaction(1) == 0); cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0); @@ -1146,10 +1146,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit, BOOST_REQUIRE(cc.start_transaction(1) == 0); cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); cc.bytes_generated_ = 2; BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0); @@ -1168,9 +1168,9 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_1pc_commit, wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(cc.after_statement() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); @@ -1189,14 +1189,14 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_batch_streaming_1pc_commit, wsrep::streaming_context::statement, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(cc.after_statement() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(cc.after_statement() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0); @@ -1215,7 +1215,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_cert_fail, wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); sc.provider().certify_result_ = wsrep::provider::error_certification_failed; BOOST_REQUIRE(cc.after_statement()); BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); diff --git a/test/transaction_test_2pc.cpp b/test/transaction_test_2pc.cpp index 8ada781..0c85041 100644 --- a/test/transaction_test_2pc.cpp +++ b/test/transaction_test_2pc.cpp @@ -189,7 +189,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_commit, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.before_commit() == 0); @@ -206,11 +206,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_streaming_2pc_commit_two_statements, { BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.after_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.after_row() == 0); - BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 2); BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.before_commit() == 0);