1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-31 18:24:25 +03:00

Unit tests for SR with two statements, SR rollback.

This commit is contained in:
Teemu Ollakka
2018-06-13 10:18:46 +03:00
parent 292072bf56
commit 265d9b3322
10 changed files with 140 additions and 16 deletions

View File

@ -265,6 +265,11 @@ public:
// Client context management
wsrep::client_context* local_client_context();
wsrep::client_context& streaming_applier_client_context(
const wsrep::id&, const wsrep::transaction_id&) override
{
throw wsrep::not_implemented_error();
}
size_t next_transaction_id()
{
return (last_transaction_id_.fetch_add(1) + 1);

View File

@ -229,6 +229,12 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta);
}
void adopt_transaction(wsrep::transaction_context& transaction)
{
transaction_.start_transaction(transaction.id());
transaction_.streaming_context_ = transaction.streaming_context_;
}
void enable_streaming(
enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit,
size_t fragment_size)

View File

@ -30,6 +30,9 @@ namespace wsrep
}
};
class fatal_error : public std::exception
{
};
}

View File

@ -66,6 +66,7 @@
#include <vector>
#include <string>
#include <map>
namespace wsrep
{
@ -74,7 +75,9 @@ namespace wsrep
class ws_meta;
class provider;
class client_context;
class transaction_id;
class transaction_context;
class id;
class gtid;
class view;
class const_buffer;
@ -198,6 +201,23 @@ namespace wsrep
*/
virtual client_context* local_client_context() = 0;
/*!
* Create applier context for streaming transaction.
*
* \param server_id Server id of the origin of the SR transaction.
* \param transaction_id Transaction ID of the SR transaction on the
* origin server.
*/
virtual client_context& streaming_applier_client_context(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id
) = 0;
void insert_streaming_applier(
const wsrep::id&,
const wsrep::transaction_id&,
wsrep::client_context* client_context);
/*!
* Load WSRep provider.
*
@ -383,6 +403,7 @@ namespace wsrep
, cond_(cond)
, state_(s_disconnected)
, state_waiters_(n_states_)
, streaming_appliers_()
, provider_()
, name_(name)
, id_(id)
@ -403,6 +424,8 @@ namespace wsrep
wsrep::condition_variable& cond_;
enum state state_;
mutable std::vector<int> state_waiters_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, wsrep::client_context*> streaming_appliers_map;
streaming_appliers_map streaming_appliers_;
wsrep::provider* provider_;
std::string name_;
std::string id_;

View File

@ -135,6 +135,7 @@ namespace wsrep
void flags(int flags) { flags_ = flags; }
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback();
void clear_fragments();
void cleanup();
void debug_log_state(const char*) const;

View File

@ -8,6 +8,7 @@
#include "wsrep/view.hpp"
#include "wsrep/logger.hpp"
#include "wsrep/compiler.hpp"
#include "wsrep/id.hpp"
#include <cassert>
#include <sstream>
@ -178,6 +179,21 @@ bool wsrep::server_context::statement_allowed_for_streaming(
return false;
}
void wsrep::server_context::insert_streaming_applier(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id,
wsrep::client_context* client_context)
{
if (streaming_appliers_.insert(
std::make_pair(std::make_pair(server_id, transaction_id),
client_context)).second == false)
{
wsrep::log_error() << "Could not insert streaming applier";
delete client_context;
throw wsrep::fatal_error();
}
}
// Private
void wsrep::server_context::state(

View File

@ -367,8 +367,7 @@ int wsrep::transaction_context::before_rollback()
// Voluntary rollback
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
streaming_rollback();
}
state(lock, s_aborting);
break;
@ -652,11 +651,6 @@ int wsrep::transaction_context::certify_fragment(
lock.unlock();
if (streaming_context_.fragments_certified())
{
flags_ |= wsrep::provider::flag::start_transaction;
}
wsrep::mutable_buffer data;
if (client_context_.prepare_fragment_for_replication(*this, data))
{
@ -869,6 +863,18 @@ int wsrep::transaction_context::certify_commit(
return ret;
}
void wsrep::transaction_context::streaming_rollback()
{
assert(streaming_context_.rolled_back() == false);
wsrep::client_context& applier_context(
client_context_.server_context().streaming_applier_client_context(
client_context_.server_context().id(), id()));
applier_context.adopt_transaction(*this);
streaming_context_.cleanup();
// Replicate rollback fragment
provider_.rollback(id_.get());
}
void wsrep::transaction_context::clear_fragments()
{
streaming_context_.cleanup();
@ -876,13 +882,10 @@ void wsrep::transaction_context::clear_fragments()
void wsrep::transaction_context::cleanup()
{
assert(is_streaming() == false);
debug_log_state("cleanup_enter");
id_ = wsrep::transaction_id::invalid();
ws_handle_ = wsrep::ws_handle();
if (is_streaming())
{
state_ = s_executing;
}
// Keep the state history for troubleshooting. Reset at start_transaction().
// state_hist_.clear();
ws_meta_ = wsrep::ws_meta();

View File

@ -71,6 +71,7 @@ namespace wsrep
}
if (rolls_back_transaction(flags))
{
assert(0);
++rollback_fragments_;
}
@ -116,7 +117,11 @@ namespace wsrep
int append_data(wsrep::ws_handle&, const wsrep::const_buffer&)
{ return 0; }
int rollback(const wsrep::transaction_id)
{ return next_error_; }
{
++fragments_;
++rollback_fragments_;
return 0;
}
enum wsrep::provider::status
commit_order_enter(const wsrep::ws_handle&,
const wsrep::ws_meta&)
@ -170,7 +175,7 @@ namespace wsrep
size_t start_fragments() const { return start_fragments_; }
size_t fragments() const { return fragments_; }
size_t commit_fragments() const { return commit_fragments_; }
size_t rollback_fragments() const { return commit_fragments_; }
size_t rollback_fragments() const { return rollback_fragments_; }
private:
wsrep::id group_id_;

View File

@ -33,7 +33,14 @@ namespace wsrep
return new wsrep::mock_client_context(*this, ++last_client_id_,
wsrep::client_context::m_local);
}
wsrep::client_context& streaming_applier_client_context(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id)
{
wsrep::client_context* sac(new wsrep::mock_client_context(*this, ++last_client_id_, wsrep::client_context::m_applier));
insert_streaming_applier(server_id, transaction_id, sac);
return *sac;
}
void on_connect() WSREP_OVERRIDE { }
void wait_until_connected() WSREP_OVERRIDE { }
void on_view(const wsrep::view&) WSREP_OVERRIDE { }

View File

@ -1202,7 +1202,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit,
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit,
@ -1222,3 +1221,59 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit,
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit_two_statements,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 3);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit_two_statements,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(cc.before_statement() == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 2);
BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 3);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_rollback,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_rollback() == 0);
BOOST_REQUIRE(cc.after_rollback() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
}