1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-30 07:23:07 +03:00

More unit tests for streaming replication.

This commit is contained in:
Teemu Ollakka
2018-06-13 14:13:55 +03:00
parent f07885e204
commit 6677e3cfd8
10 changed files with 162 additions and 1239 deletions

View File

@ -229,6 +229,12 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta);
}
int start_replaying()
{
assert(mode_ == m_applier);
return transaction_.start_replaying();
}
void adopt_transaction(wsrep::transaction_context& transaction)
{
transaction_.start_transaction(transaction.id());

View File

@ -92,6 +92,8 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta);
int start_replaying();
int append_key(const wsrep::key&);
int append_data(const wsrep::const_buffer&);

View File

@ -136,6 +136,11 @@ int wsrep::server_context::on_apply(
assert(txc.active() == false);
client_context.start_transaction(ws_handle, ws_meta);
}
else
{
client_context.start_replaying();
}
if (client_context.apply(data))
{
ret = 1;

View File

@ -79,6 +79,17 @@ int wsrep::transaction_context::start_transaction(
return 0;
}
int wsrep::transaction_context::start_replaying()
{
assert(ws_meta_.flags() & wsrep::provider::flag::commit);
assert(active());
assert(client_context_.mode() == wsrep::client_context::m_applier);
assert(state() == s_replaying);
assert(ws_handle_.opaque());
assert(ws_meta_.seqno().nil() == false);
certified_ = true;
return 0;
}
int wsrep::transaction_context::append_key(const wsrep::key& key)
{
@ -255,6 +266,7 @@ int wsrep::transaction_context::before_commit()
}
if (ret == 0)
{
assert(certified());
assert(ordered());
lock.unlock();
enum wsrep::provider::status
@ -280,6 +292,7 @@ int wsrep::transaction_context::before_commit()
}
break;
case wsrep::client_context::m_applier:
assert(certified());
assert(ordered());
ret = provider_.commit_order_enter(ws_handle_, ws_meta_);
if (ret)
@ -593,7 +606,8 @@ void wsrep::transaction_context::state(
{
log_debug() << "client: " << client_context_.id().get()
<< " txc: " << id().get()
<< " state: " << state_ << " -> " << next_state;
<< " state: " << to_string(state_)
<< " -> " << to_string(next_state);
}
assert(lock.owns_lock());
static const char allowed[n_states][n_states] =

File diff suppressed because it is too large Load Diff

View File

@ -119,5 +119,46 @@ namespace
wsrep::fake_client_context cc;
const wsrep::transaction_context& tc;
};
struct streaming_client_fixture_byte
{
streaming_client_fixture_byte()
: sc("s1", "s1", wsrep::server_context::rm_sync)
, cc(sc, wsrep::client_id(1),
wsrep::client_context::m_replicating)
, tc(cc.transaction())
{
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1);
}
wsrep::fake_server_context sc;
wsrep::fake_client_context cc;
const wsrep::transaction_context& tc;
};
struct streaming_client_fixture_statement
{
streaming_client_fixture_statement()
: sc("s1", "s1", wsrep::server_context::rm_sync)
, cc(sc, wsrep::client_id(1),
wsrep::client_context::m_replicating)
, tc(cc.transaction())
{
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1);
}
wsrep::fake_server_context sc;
wsrep::fake_client_context cc;
const wsrep::transaction_context& tc;
};
}
#endif // WSREP_TEST_CLIENT_CONTEXT_FIXTURE_HPP

View File

@ -10,7 +10,8 @@ int wsrep::fake_client_context::apply(
const wsrep::const_buffer& data __attribute__((unused)))
{
assert(transaction_.state() == wsrep::transaction_context::s_executing);
assert(transaction_.state() == wsrep::transaction_context::s_executing ||
transaction_.state() == wsrep::transaction_context::s_replaying);
return (fail_next_applying_ ? 1 : 0);
}

View File

@ -56,10 +56,12 @@ namespace wsrep
void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { }
int replay(wsrep::transaction_context& tc) WSREP_OVERRIDE
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
tc.state(lock, wsrep::transaction_context::s_committing);
tc.state(lock, wsrep::transaction_context::s_ordered_commit);
tc.state(lock, wsrep::transaction_context::s_committed);
wsrep::client_applier_mode am(*this);
if (server_context().on_apply(
*this, tc.ws_handle(), tc.ws_meta(), wsrep::const_buffer()))
{
return 1;
}
++replays_;
return 0;
}

View File

@ -52,6 +52,7 @@ namespace wsrep
int flags,
wsrep::ws_meta& ws_meta)
{
ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1);
wsrep::log_info() << "provider certify: "
<< "client: " << client_id.get()
<< " flags: " << std::hex << flags

View File

@ -899,7 +899,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_applying_rollback,
// STREAMING REPLICATION //
///////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit,
//
// Test 1PC with row streaming with one row
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_1pc_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
@ -914,9 +917,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit,
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit_two_statements,
//
// Test 1PC row streaming with two separate statements
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_row_streaming_1pc_commit_two_statements,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
@ -933,11 +938,12 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit_two_statements,
BOOST_REQUIRE(sc.provider().fragments() == 3);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_rollback,
//
// Test streaming rollback
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_rollback,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
@ -951,7 +957,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_rollback,
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_cert_fail_non_commit,
//
// Test streaming certification failure during fragment replication
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_cert_fail_non_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
@ -968,3 +977,69 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_cert_fail_non_commit,
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
}
//
// Test streaming certification failure during commit
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_cert_fail_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
sc.provider().certify_status_ = wsrep::provider::error_certification_failed;
BOOST_REQUIRE(cc.before_commit() == 1);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_cert_failed);
sc.provider().certify_status_ = wsrep::provider::success;
BOOST_REQUIRE(cc.before_rollback() == 0);
BOOST_REQUIRE(cc.after_rollback() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_error);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_aborted);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
}
//
// Test streaming BF abort after succesful certification
//
BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing,
streaming_client_fixture_row)
{
cc.debug_log_level(1);
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_committing);
wsrep_test::bf_abort_ordered(cc);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_must_abort);
BOOST_REQUIRE(cc.before_rollback() == 0);
BOOST_REQUIRE(cc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_must_replay);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_committed);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit,
streaming_client_fixture_byte)
{
}
BOOST_FIXTURE_TEST_CASE(transaction_context_byte_streaming_1pc_commit,
streaming_client_fixture_byte)
{
}
BOOST_FIXTURE_TEST_CASE(transaction_context_statement_streaming_1pc_commit,
streaming_client_fixture_statement)
{
}