1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Fix attempt to replicate empty fragments

* Adds method wsrep::transaction::streaming_step() so that there is a
  single place where streaming context unit counter is udpated.
  The method also checks that some data has been generated before
  attempting fragment replication.
* Emit a warning if there is an attempt to replicate a fragment and
  there is no data to replicate.
This commit is contained in:
Daniele Sciascia
2019-01-07 11:45:28 +01:00
parent a9e2fdccfc
commit 4ac15e4349
7 changed files with 106 additions and 41 deletions

View File

@ -86,8 +86,22 @@ namespace wsrep
* otherwise false. * otherwise false.
*/ */
virtual bool statement_allowed_for_streaming() const = 0; virtual bool statement_allowed_for_streaming() const = 0;
/**
* Return the total number of bytes generated by the transaction
* context.
*/
virtual size_t bytes_generated() const = 0; virtual size_t bytes_generated() const = 0;
/**
* Prepare a buffer containing data for the next fragment to replicate.
*
* @return Zero in case of success, non-zero on failure.
* If there is no data to replicate, the method shall return
* zero and leave the buffer empty.
*/
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
/** /**
* Remove fragments from the storage within current transaction. * Remove fragments from the storage within current transaction.
* Fragment removal will be committed once the current transaction * Fragment removal will be committed once the current transaction
@ -115,7 +129,6 @@ namespace wsrep
// //
// Interface to global server state // Interface to global server state
// //
/** /**
* Forcefully shut down the DBMS process or replication system. * Forcefully shut down the DBMS process or replication system.
* This may be called in situations where * This may be called in situations where

View File

@ -118,6 +118,11 @@ namespace wsrep
return unit_counter_; return unit_counter_;
} }
void set_unit_counter(size_t count)
{
unit_counter_ = count;
}
void increment_unit_counter(size_t inc) void increment_unit_counter(size_t inc)
{ {
unit_counter_ += inc; unit_counter_ += inc;
@ -133,6 +138,11 @@ namespace wsrep
return fragments_; return fragments_;
} }
bool fragment_size_exceeded() const
{
return unit_counter_ >= fragment_size_;
}
void cleanup() void cleanup()
{ {
fragments_certified_ = 0; fragments_certified_ = 0;

View File

@ -183,6 +183,7 @@ namespace wsrep
wsrep::provider& provider(); wsrep::provider& provider();
void flags(int flags) { flags_ = flags; } void flags(int flags) { flags_ = flags; }
int streaming_step(wsrep::unique_lock<wsrep::mutex>&);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&); int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int append_sr_keys_for_commit(); int append_sr_keys_for_commit();

View File

@ -241,31 +241,10 @@ int wsrep::transaction::after_row()
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex()); wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
debug_log_state("after_row_enter"); debug_log_state("after_row_enter");
int ret(0); int ret(0);
if (streaming_context_.fragment_size() > 0) if (streaming_context_.fragment_size() &&
streaming_context_.fragment_unit() != streaming_context::statement)
{ {
switch (streaming_context_.fragment_unit()) ret = streaming_step(lock);
{
case streaming_context::row:
streaming_context_.increment_unit_counter(1);
if (streaming_context_.unit_counter() >=
streaming_context_.fragment_size())
{
streaming_context_.reset_unit_counter();
ret = certify_fragment(lock);
}
break;
case streaming_context::bytes:
if (client_service_.bytes_generated() >=
streaming_context_.bytes_certified()
+ streaming_context_.fragment_size())
{
ret = certify_fragment(lock);
}
break;
case streaming_context::statement:
// This case is checked in after_statement()
break;
}
} }
debug_log_state("after_row_leave"); debug_log_state("after_row_leave");
return ret; return ret;
@ -692,12 +671,7 @@ int wsrep::transaction::after_statement()
streaming_context_.fragment_size() && streaming_context_.fragment_size() &&
streaming_context_.fragment_unit() == streaming_context::statement) streaming_context_.fragment_unit() == streaming_context::statement)
{ {
streaming_context_.increment_unit_counter(1); ret = streaming_step(lock);
if (streaming_context_.unit_counter() >= streaming_context_.fragment_size())
{
streaming_context_.reset_unit_counter();
ret = certify_fragment(lock);
}
} }
switch (state()) switch (state())
@ -1019,6 +993,45 @@ void wsrep::transaction::state(
} }
} }
int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(streaming_context_.fragment_size());
int ret(0);
const ssize_t bytes_to_replicate(client_service_.bytes_generated() -
streaming_context_.bytes_certified());
switch (streaming_context_.fragment_unit())
{
case streaming_context::row:
// fall through
case streaming_context::statement:
streaming_context_.increment_unit_counter(1);
break;
case streaming_context::bytes:
streaming_context_.set_unit_counter(bytes_to_replicate);
break;
}
if (streaming_context_.fragment_size_exceeded())
{
// Some statements have no effect. Do not atttempt to
// replicate a fragment if no data has been generated
// since last fragment replication.
if (bytes_to_replicate <= 0)
{
assert(bytes_to_replicate == 0);
return ret;
}
streaming_context_.reset_unit_counter();
ret = certify_fragment(lock);
}
return ret;
}
int wsrep::transaction::certify_fragment( int wsrep::transaction::certify_fragment(
wsrep::unique_lock<wsrep::mutex>& lock) wsrep::unique_lock<wsrep::mutex>& lock)
{ {
@ -1056,6 +1069,14 @@ int wsrep::transaction::certify_fragment(
return 1; return 1;
} }
if (data.size() == 0)
{
wsrep::log_warning() << "Attempt to replicate empty data buffer";
lock.lock();
state(lock, s_executing);
return 0;
}
if (provider().append_data(ws_handle_, if (provider().append_data(ws_handle_,
wsrep::const_buffer(data.data(), data.size()))) wsrep::const_buffer(data.data(), data.size())))
{ {

View File

@ -259,7 +259,7 @@ namespace
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
cc.enable_streaming(wsrep::streaming_context::row, 1); cc.enable_streaming(wsrep::streaming_context::statement, 1);
} }
wsrep::mock_server_service server_service; wsrep::mock_server_service server_service;

View File

@ -200,6 +200,12 @@ namespace wsrep
: mock_client_state(server_state, *this, id, mode) : mock_client_state(server_state, *this, id, mode)
, mock_client_service(static_cast<mock_client_state&>(*this)) , mock_client_service(static_cast<mock_client_state&>(*this))
{ } { }
int after_row()
{
bytes_generated_++;
return wsrep::client_state::after_row();
}
}; };
} }

View File

@ -1190,10 +1190,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit,
cc.enable_streaming( cc.enable_streaming(
wsrep::streaming_context::bytes, 2) == 0); wsrep::streaming_context::bytes, 2) == 0);
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);
cc.bytes_generated_ = 2;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
@ -1206,12 +1204,31 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit,
} }
BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_statement_with_no_effect,
streaming_client_fixture_statement)
{
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.after_statement() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.after_statement() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == 0);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_1pc_commit, BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_1pc_commit,
streaming_client_fixture_statement) streaming_client_fixture_statement)
{ {
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::streaming_context::statement, 1) == 0);
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);
@ -1254,11 +1271,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_batch_streaming_1pc_commit,
} }
BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_cert_fail, BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_cert_fail,
streaming_client_fixture_row) streaming_client_fixture_statement)
{ {
BOOST_REQUIRE(
cc.enable_streaming(
wsrep::streaming_context::statement, 1) == 0);
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 0);