From 4ac15e4349372a7515d9e0cade59d76cdc9f941a Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Mon, 7 Jan 2019 11:45:28 +0100 Subject: [PATCH] Fix attempt to replicate empty fragments * Adds method wsrep::transaction::streaming_step() so that there is a single place where streaming context unit counter is udpated. The method also checks that some data has been generated before attempting fragment replication. * Emit a warning if there is an attempt to replicate a fragment and there is no data to replicate. --- include/wsrep/client_service.hpp | 15 +++++- include/wsrep/streaming_context.hpp | 10 ++++ include/wsrep/transaction.hpp | 1 + src/transaction.cpp | 81 ++++++++++++++++++----------- test/client_state_fixture.hpp | 2 +- test/mock_client_state.hpp | 6 +++ test/transaction_test.cpp | 32 ++++++++---- 7 files changed, 106 insertions(+), 41 deletions(-) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index be4c7ac..b45fed2 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -86,8 +86,22 @@ namespace wsrep * otherwise false. */ virtual bool statement_allowed_for_streaming() const = 0; + + /** + * Return the total number of bytes generated by the transaction + * context. + */ virtual size_t bytes_generated() const = 0; + + /** + * Prepare a buffer containing data for the next fragment to replicate. + * + * @return Zero in case of success, non-zero on failure. + * If there is no data to replicate, the method shall return + * zero and leave the buffer empty. + */ virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; + /** * Remove fragments from the storage within current transaction. * Fragment removal will be committed once the current transaction @@ -115,7 +129,6 @@ namespace wsrep // // Interface to global server state // - /** * Forcefully shut down the DBMS process or replication system. * This may be called in situations where diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index bb220a0..b13f399 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -118,6 +118,11 @@ namespace wsrep return unit_counter_; } + void set_unit_counter(size_t count) + { + unit_counter_ = count; + } + void increment_unit_counter(size_t inc) { unit_counter_ += inc; @@ -133,6 +138,11 @@ namespace wsrep return fragments_; } + bool fragment_size_exceeded() const + { + return unit_counter_ >= fragment_size_; + } + void cleanup() { fragments_certified_ = 0; diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 10e534e..2c71c28 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -183,6 +183,7 @@ namespace wsrep wsrep::provider& provider(); void flags(int flags) { flags_ = flags; } + int streaming_step(wsrep::unique_lock&); int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); int append_sr_keys_for_commit(); diff --git a/src/transaction.cpp b/src/transaction.cpp index 96052a0..a1eb051 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -241,31 +241,10 @@ int wsrep::transaction::after_row() wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_row_enter"); int ret(0); - if (streaming_context_.fragment_size() > 0) + if (streaming_context_.fragment_size() && + streaming_context_.fragment_unit() != streaming_context::statement) { - switch (streaming_context_.fragment_unit()) - { - case streaming_context::row: - streaming_context_.increment_unit_counter(1); - if (streaming_context_.unit_counter() >= - streaming_context_.fragment_size()) - { - streaming_context_.reset_unit_counter(); - ret = certify_fragment(lock); - } - break; - case streaming_context::bytes: - if (client_service_.bytes_generated() >= - streaming_context_.bytes_certified() - + streaming_context_.fragment_size()) - { - ret = certify_fragment(lock); - } - break; - case streaming_context::statement: - // This case is checked in after_statement() - break; - } + ret = streaming_step(lock); } debug_log_state("after_row_leave"); return ret; @@ -692,12 +671,7 @@ int wsrep::transaction::after_statement() streaming_context_.fragment_size() && streaming_context_.fragment_unit() == streaming_context::statement) { - streaming_context_.increment_unit_counter(1); - if (streaming_context_.unit_counter() >= streaming_context_.fragment_size()) - { - streaming_context_.reset_unit_counter(); - ret = certify_fragment(lock); - } + ret = streaming_step(lock); } switch (state()) @@ -1019,6 +993,45 @@ void wsrep::transaction::state( } } +int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(streaming_context_.fragment_size()); + + int ret(0); + const ssize_t bytes_to_replicate(client_service_.bytes_generated() - + streaming_context_.bytes_certified()); + + switch (streaming_context_.fragment_unit()) + { + case streaming_context::row: + // fall through + case streaming_context::statement: + streaming_context_.increment_unit_counter(1); + break; + case streaming_context::bytes: + streaming_context_.set_unit_counter(bytes_to_replicate); + break; + } + + if (streaming_context_.fragment_size_exceeded()) + { + // Some statements have no effect. Do not atttempt to + // replicate a fragment if no data has been generated + // since last fragment replication. + if (bytes_to_replicate <= 0) + { + assert(bytes_to_replicate == 0); + return ret; + } + + streaming_context_.reset_unit_counter(); + ret = certify_fragment(lock); + } + + return ret; +} + int wsrep::transaction::certify_fragment( wsrep::unique_lock& lock) { @@ -1056,6 +1069,14 @@ int wsrep::transaction::certify_fragment( return 1; } + if (data.size() == 0) + { + wsrep::log_warning() << "Attempt to replicate empty data buffer"; + lock.lock(); + state(lock, s_executing); + return 0; + } + if (provider().append_data(ws_handle_, wsrep::const_buffer(data.data(), data.size()))) { diff --git a/test/client_state_fixture.hpp b/test/client_state_fixture.hpp index 44fca34..966dead 100644 --- a/test/client_state_fixture.hpp +++ b/test/client_state_fixture.hpp @@ -259,7 +259,7 @@ namespace // Verify initial state BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); - cc.enable_streaming(wsrep::streaming_context::row, 1); + cc.enable_streaming(wsrep::streaming_context::statement, 1); } wsrep::mock_server_service server_service; diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index eb7f282..7c11f7b 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -200,6 +200,12 @@ namespace wsrep : mock_client_state(server_state, *this, id, mode) , mock_client_service(static_cast(*this)) { } + + int after_row() + { + bytes_generated_++; + return wsrep::client_state::after_row(); + } }; } diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index f93e870..7e840ca 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1190,10 +1190,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit, cc.enable_streaming( wsrep::streaming_context::bytes, 2) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); - cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); - cc.bytes_generated_ = 2; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); @@ -1206,12 +1204,31 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit, } +BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_statement_with_no_effect, + streaming_client_fixture_statement) +{ + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.after_statement() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.after_statement() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == 0); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} + BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_1pc_commit, streaming_client_fixture_statement) { - BOOST_REQUIRE( - cc.enable_streaming( - wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); @@ -1254,11 +1271,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_batch_streaming_1pc_commit, } BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_cert_fail, - streaming_client_fixture_row) + streaming_client_fixture_statement) { - BOOST_REQUIRE( - cc.enable_streaming( - wsrep::streaming_context::statement, 1) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);