1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-27 09:01:50 +03:00

Bytes and statement streaming unit.

This commit is contained in:
Teemu Ollakka
2018-06-13 16:51:23 +03:00
parent 94b4d3a7db
commit 37efaba3e5
7 changed files with 166 additions and 19 deletions

View File

@ -393,6 +393,7 @@ private:
} }
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override
{ } { }
size_t bytes_generated() const { return 0; }
int prepare_data_for_replication(const wsrep::transaction_context&) int prepare_data_for_replication(const wsrep::transaction_context&)
override override
{ return 0; } { return 0; }

View File

@ -46,7 +46,7 @@
#include "lock.hpp" #include "lock.hpp"
#include "buffer.hpp" #include "buffer.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "logger.hpp"
namespace wsrep namespace wsrep
{ {
@ -241,12 +241,23 @@ namespace wsrep
transaction_.streaming_context_ = transaction.streaming_context_; transaction_.streaming_context_ = transaction.streaming_context_;
} }
void enable_streaming( int enable_streaming(
enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit, enum wsrep::transaction_context::streaming_context::fragment_unit
fragment_unit,
size_t fragment_size) size_t fragment_size)
{ {
if (transaction_.active() &&
transaction_.streaming_context_.fragment_unit() !=
fragment_unit)
{
wsrep::log_error()
<< "Changing fragment unit for active transaction "
<< "not allowed";
return 1;
}
transaction_.streaming_context_.enable( transaction_.streaming_context_.enable(
fragment_unit, fragment_size); fragment_unit, fragment_size);
return 0;
} }
int append_key(const wsrep::key& key) int append_key(const wsrep::key& key)
{ {
@ -489,6 +500,7 @@ namespace wsrep
virtual int prepare_data_for_replication( virtual int prepare_data_for_replication(
const wsrep::transaction_context&) = 0; const wsrep::transaction_context&) = 0;
virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication( virtual int prepare_fragment_for_replication(
const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0;
/*! /*!

View File

@ -219,8 +219,9 @@ namespace wsrep
} }
size_t unit_counter() const { return unit_counter_; } size_t unit_counter() const { return unit_counter_; }
void increment_unit_counter() { ++unit_counter_; } void increment_unit_counter(size_t inc)
{ unit_counter_ += inc; }
void reset_unit_counter() { unit_counter_ = 0; }
void cleanup() void cleanup()
{ {
fragments_.clear(); fragments_.clear();

View File

@ -111,16 +111,25 @@ int wsrep::transaction_context::after_row()
switch (streaming_context_.fragment_unit()) switch (streaming_context_.fragment_unit())
{ {
case streaming_context::row: case streaming_context::row:
streaming_context_.increment_unit_counter(); streaming_context_.increment_unit_counter(1);
if (streaming_context_.fragments_certified() if (streaming_context_.unit_counter() >=
+ streaming_context_.unit_counter() >=
streaming_context_.fragment_size()) streaming_context_.fragment_size())
{
streaming_context_.reset_unit_counter();
return certify_fragment(lock);
}
break;
case streaming_context::bytes:
if (client_context_.bytes_generated() >=
streaming_context_.bytes_certified()
+ streaming_context_.fragment_size())
{ {
return certify_fragment(lock); return certify_fragment(lock);
} }
break; break;
default: case streaming_context::statement:
assert(0); // This case is checked in after_statement()
break;
} }
} }
return 0; return 0;
@ -463,6 +472,18 @@ int wsrep::transaction_context::after_statement()
state() == s_cert_failed || state() == s_cert_failed ||
state() == s_must_replay); state() == s_must_replay);
if (state() == s_executing &&
streaming_context_.fragment_size() &&
streaming_context_.fragment_unit() == streaming_context::statement)
{
streaming_context_.increment_unit_counter(1);
if (streaming_context_.unit_counter() >= streaming_context_.fragment_size())
{
streaming_context_.reset_unit_counter();
ret = certify_fragment(lock);
}
}
switch (state()) switch (state())
{ {
case s_executing: case s_executing:

View File

@ -182,7 +182,7 @@ namespace
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1); cc.enable_streaming(wsrep::transaction_context::streaming_context::bytes, 1);
} }
wsrep::fake_server_context sc; wsrep::fake_server_context sc;
wsrep::fake_client_context cc; wsrep::fake_client_context cc;

View File

@ -33,6 +33,7 @@ namespace wsrep
, killed_before_certify_() , killed_before_certify_()
, sync_point_enabled_() , sync_point_enabled_()
, sync_point_action_() , sync_point_action_()
, bytes_generated_()
, replays_() , replays_()
, aborts_() , aborts_()
{ } { }
@ -87,6 +88,11 @@ namespace wsrep
return transaction_.append_data(data); return transaction_.append_data(data);
} }
size_t bytes_generated() const
{
return bytes_generated_;
}
int prepare_fragment_for_replication(const wsrep::transaction_context&, int prepare_fragment_for_replication(const wsrep::transaction_context&,
wsrep::mutable_buffer& buffer) wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE WSREP_OVERRIDE
@ -142,6 +148,7 @@ namespace wsrep
{ {
spa_bf_abort spa_bf_abort
} sync_point_action_; } sync_point_action_;
size_t bytes_generated_;
private: private:
size_t replays_; size_t replays_;
size_t aborts_; size_t aborts_;

View File

@ -917,6 +917,28 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_1pc_commit,
BOOST_REQUIRE(sc.provider().commit_fragments() == 1); BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
} }
//
// Test 1PC with row streaming with one row
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_batch_streaming_1pc_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.enable_streaming(
wsrep::transaction_context::streaming_context::row, 2) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
// //
// Test 1PC row streaming with two separate statements // Test 1PC row streaming with two separate statements
// //
@ -1005,7 +1027,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_cert_fail_commit,
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing, BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing,
streaming_client_fixture_row) streaming_client_fixture_row)
{ {
cc.debug_log_level(1);
BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
@ -1025,21 +1046,105 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing,
BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit,
streaming_client_fixture_byte)
{
}
BOOST_FIXTURE_TEST_CASE(transaction_context_byte_streaming_1pc_commit, BOOST_FIXTURE_TEST_CASE(transaction_context_byte_streaming_1pc_commit,
streaming_client_fixture_byte) streaming_client_fixture_byte)
{ {
BOOST_REQUIRE(cc.start_transaction(1) == 0);
cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit,
streaming_client_fixture_byte)
{
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::transaction_context::streaming_context::bytes, 2) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
cc.bytes_generated_ = 2;
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
} }
BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_1pc_commit, BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_1pc_commit,
streaming_client_fixture_statement) streaming_client_fixture_statement)
{ {
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::transaction_context::streaming_context::statement, 1) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_statement_batch_streaming_1pc_commit,
streaming_client_fixture_statement)
{
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::transaction_context::streaming_context::statement, 2) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_cert_fail,
streaming_client_fixture_row)
{
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::transaction_context::streaming_context::statement, 1) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
sc.provider().certify_status_ = wsrep::provider::error_certification_failed;
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_error);
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
BOOST_REQUIRE(sc.provider().fragments() == 0);
BOOST_REQUIRE(sc.provider().start_fragments() == 0);
BOOST_REQUIRE(sc.provider().rollback_fragments() == 0);
} }