diff --git a/dbsim/dbms_simulator.cpp b/dbsim/dbms_simulator.cpp index 38a71c7..4d1e9b9 100644 --- a/dbsim/dbms_simulator.cpp +++ b/dbsim/dbms_simulator.cpp @@ -393,6 +393,7 @@ private: } void wait_for_replayers(wsrep::unique_lock&) override { } + size_t bytes_generated() const { return 0; } int prepare_data_for_replication(const wsrep::transaction_context&) override { return 0; } diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index cf9d130..040e000 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -46,7 +46,7 @@ #include "lock.hpp" #include "buffer.hpp" #include "thread.hpp" - +#include "logger.hpp" namespace wsrep { @@ -241,12 +241,23 @@ namespace wsrep transaction_.streaming_context_ = transaction.streaming_context_; } - void enable_streaming( - enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit, + int enable_streaming( + enum wsrep::transaction_context::streaming_context::fragment_unit + fragment_unit, size_t fragment_size) { + if (transaction_.active() && + transaction_.streaming_context_.fragment_unit() != + fragment_unit) + { + wsrep::log_error() + << "Changing fragment unit for active transaction " + << "not allowed"; + return 1; + } transaction_.streaming_context_.enable( fragment_unit, fragment_size); + return 0; } int append_key(const wsrep::key& key) { @@ -489,6 +500,7 @@ namespace wsrep virtual int prepare_data_for_replication( const wsrep::transaction_context&) = 0; + virtual size_t bytes_generated() const = 0; virtual int prepare_fragment_for_replication( const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; /*! diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index a51331f..43a9795 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -219,8 +219,9 @@ namespace wsrep } size_t unit_counter() const { return unit_counter_; } - void increment_unit_counter() { ++unit_counter_; } - + void increment_unit_counter(size_t inc) + { unit_counter_ += inc; } + void reset_unit_counter() { unit_counter_ = 0; } void cleanup() { fragments_.clear(); diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 6955218..26c2c7e 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -111,16 +111,25 @@ int wsrep::transaction_context::after_row() switch (streaming_context_.fragment_unit()) { case streaming_context::row: - streaming_context_.increment_unit_counter(); - if (streaming_context_.fragments_certified() - + streaming_context_.unit_counter() >= + streaming_context_.increment_unit_counter(1); + if (streaming_context_.unit_counter() >= streaming_context_.fragment_size()) + { + streaming_context_.reset_unit_counter(); + return certify_fragment(lock); + } + break; + case streaming_context::bytes: + if (client_context_.bytes_generated() >= + streaming_context_.bytes_certified() + + streaming_context_.fragment_size()) { return certify_fragment(lock); } break; - default: - assert(0); + case streaming_context::statement: + // This case is checked in after_statement() + break; } } return 0; @@ -463,6 +472,18 @@ int wsrep::transaction_context::after_statement() state() == s_cert_failed || state() == s_must_replay); + if (state() == s_executing && + streaming_context_.fragment_size() && + streaming_context_.fragment_unit() == streaming_context::statement) + { + streaming_context_.increment_unit_counter(1); + if (streaming_context_.unit_counter() >= streaming_context_.fragment_size()) + { + streaming_context_.reset_unit_counter(); + ret = certify_fragment(lock); + } + } + switch (state()) { case s_executing: diff --git a/test/client_context_fixture.hpp b/test/client_context_fixture.hpp index 3d2244d..e032557 100644 --- a/test/client_context_fixture.hpp +++ b/test/client_context_fixture.hpp @@ -182,7 +182,7 @@ namespace // Verify initial state BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1); + cc.enable_streaming(wsrep::transaction_context::streaming_context::bytes, 1); } wsrep::fake_server_context sc; wsrep::fake_client_context cc; diff --git a/test/fake_client_context.hpp b/test/fake_client_context.hpp index 828634a..747f51d 100644 --- a/test/fake_client_context.hpp +++ b/test/fake_client_context.hpp @@ -33,6 +33,7 @@ namespace wsrep , killed_before_certify_() , sync_point_enabled_() , sync_point_action_() + , bytes_generated_() , replays_() , aborts_() { } @@ -87,6 +88,11 @@ namespace wsrep return transaction_.append_data(data); } + size_t bytes_generated() const + { + return bytes_generated_; + } + int prepare_fragment_for_replication(const wsrep::transaction_context&, wsrep::mutable_buffer& buffer) WSREP_OVERRIDE @@ -142,6 +148,7 @@ namespace wsrep { spa_bf_abort } sync_point_action_; + size_t bytes_generated_; private: size_t replays_; size_t aborts_; diff --git a/test/transaction_context_test.cpp b/test/transaction_context_test.cpp index 48d054f..0ca6456 100644 --- a/test/transaction_context_test.cpp +++ b/test/transaction_context_test.cpp @@ -917,6 +917,28 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_1pc_commit, BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } +// +// Test 1PC with row streaming with one row +// +BOOST_FIXTURE_TEST_CASE(transaction_context_row_batch_streaming_1pc_commit, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.enable_streaming( + wsrep::transaction_context::streaming_context::row, 2) == 0); + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} + // // Test 1PC row streaming with two separate statements // @@ -1005,7 +1027,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_cert_fail_commit, BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing, streaming_client_fixture_row) { - cc.debug_log_level(1); BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); @@ -1025,21 +1046,105 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing, -BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit, - streaming_client_fixture_byte) -{ - -} - BOOST_FIXTURE_TEST_CASE(transaction_context_byte_streaming_1pc_commit, streaming_client_fixture_byte) { + BOOST_REQUIRE(cc.start_transaction(1) == 0); + cc.bytes_generated_ = 1; + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} +BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit, + streaming_client_fixture_byte) +{ + BOOST_REQUIRE( + cc.enable_streaming( + wsrep::transaction_context::streaming_context::bytes, 2) == 0); + BOOST_REQUIRE(cc.start_transaction(1) == 0); + cc.bytes_generated_ = 1; + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + cc.bytes_generated_ = 2; + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_1pc_commit, streaming_client_fixture_statement) { - + BOOST_REQUIRE( + cc.enable_streaming( + wsrep::transaction_context::streaming_context::statement, 1) == 0); + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} + +BOOST_FIXTURE_TEST_CASE(transaction_context_statement_batch_streaming_1pc_commit, + streaming_client_fixture_statement) +{ + BOOST_REQUIRE( + cc.enable_streaming( + wsrep::transaction_context::streaming_context::statement, 2) == 0); + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); + BOOST_REQUIRE(cc.before_statement() == 0); + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE(cc.ordered_commit() == 0); + BOOST_REQUIRE(cc.after_commit() == 0); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success); + BOOST_REQUIRE(sc.provider().fragments() == 2); + BOOST_REQUIRE(sc.provider().start_fragments() == 1); + BOOST_REQUIRE(sc.provider().commit_fragments() == 1); +} + +BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_cert_fail, + streaming_client_fixture_row) +{ + BOOST_REQUIRE( + cc.enable_streaming( + wsrep::transaction_context::streaming_context::statement, 1) == 0); + BOOST_REQUIRE(cc.start_transaction(1) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); + sc.provider().certify_status_ = wsrep::provider::error_certification_failed; + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_error); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + BOOST_REQUIRE(sc.provider().fragments() == 0); + BOOST_REQUIRE(sc.provider().start_fragments() == 0); + BOOST_REQUIRE(sc.provider().rollback_fragments() == 0); }