1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

Initial replicating side implementation of streaming replication

This commit is contained in:
Teemu Ollakka
2018-06-12 15:44:20 +03:00
parent d9d41a4787
commit 0186342092
7 changed files with 243 additions and 41 deletions

View File

@ -229,6 +229,13 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta);
}
void enable_streaming(
enum wsrep::transaction_context::streaming_context::fragment_unit fragment_unit,
size_t fragment_size)
{
transaction_.streaming_context_.enable(
fragment_unit, fragment_size);
}
int append_key(const wsrep::key& key)
{
assert(state_ == s_exec);
@ -240,6 +247,13 @@ namespace wsrep
assert(state_ == s_exec);
return transaction_.append_data(data);
}
int after_row()
{
assert(state_ == s_exec);
return transaction_.after_row();
}
int before_prepare()
{
assert(state_ == s_exec);

View File

@ -129,6 +129,7 @@ namespace wsrep
int flags_;
};
// Abstract interface for provider implementations
class provider
{
@ -284,6 +285,22 @@ namespace wsrep
return ret;
}
static inline bool starts_transaction(int flags)
{
return (flags & wsrep::provider::flag::start_transaction);
}
static inline bool commits_transaction(int flags)
{
return (flags & wsrep::provider::flag::commit);
}
static inline bool rolls_back_transaction(int flags)
{
return (flags & wsrep::provider::flag::rollback);
}
}
#endif // WSREP_PROVIDER_HPP

View File

@ -150,8 +150,79 @@ namespace wsrep
bool pa_unsafe_;
bool certified_;
public:
class streaming_context
{
public:
enum fragment_unit
{
row,
bytes,
statement
};
streaming_context()
: fragments_()
, rollback_replicated_for_()
, fragment_unit_()
, fragment_size_()
, bytes_certified_()
, unit_counter_()
{ }
void enable(enum fragment_unit fragment_unit, size_t fragment_size)
{
assert(fragment_size > 0);
fragment_unit_ = fragment_unit;
fragment_size_ = fragment_size;
}
enum fragment_unit fragment_unit() const { return fragment_unit_; }
size_t fragment_size() const { return fragment_size_; }
void disable()
{
fragment_size_ = 0;
}
void certified(wsrep::seqno seqno)
{
fragments_.push_back(seqno);
}
size_t fragments_certified() const
{
return fragments_.size();
}
size_t bytes_certified() const
{
return bytes_certified_;
}
void rolled_back(wsrep::transaction_id id)
{
assert(rollback_replicated_for_ == wsrep::transaction_id::invalid());
rollback_replicated_for_ = id;
}
bool rolled_back() const
{
return (rollback_replicated_for_ !=
wsrep::transaction_id::invalid());
}
size_t unit_counter() const { return unit_counter_; }
void increment_unit_counter() { ++unit_counter_; }
private:
std::vector<wsrep::seqno> fragments_;
wsrep::transaction_id rollback_replicated_for_;
enum fragment_unit fragment_unit_;
size_t fragment_size_;
size_t bytes_certified_;
size_t unit_counter_;
} streaming_context_;
};
static inline std::string to_string(enum wsrep::transaction_context::state state)

View File

@ -6,6 +6,7 @@
#define WSREP_MOCK_PROVIDER_HPP
#include "wsrep/provider.hpp"
#include "wsrep/logger.hpp"
#include <cstring>
#include <map>
@ -25,6 +26,10 @@ namespace wsrep
, group_seqno_(0)
, bf_abort_map_()
, next_error_(wsrep::provider::success)
, start_fragments_()
, fragments_()
, commit_fragments_()
, rollback_fragments_()
{ }
int connect(const std::string&, const std::string&, const std::string&,
@ -43,15 +48,28 @@ namespace wsrep
int flags,
wsrep::ws_meta& ws_meta)
{
assert(flags & wsrep::provider::flag::start_transaction);
wsrep::log_info() << "provider certify: "
<< "client: " << client_id.get()
<< " flags: " << std::hex << flags;
if (next_error_)
{
return next_error_;
}
if ((flags & wsrep::provider::flag::commit) == 0)
++fragments_;
if (starts_transaction(flags))
{
return wsrep::provider::error_provider_failed;
++start_fragments_;
}
if (commits_transaction(flags))
{
++commit_fragments_;
}
if (rolls_back_transaction(flags))
{
++rollback_fragments_;
}
wsrep::stid stid(server_id_,
ws_handle.transaction_id(),
client_id);
@ -145,12 +163,21 @@ namespace wsrep
{
next_error_ = error;
}
size_t start_fragments() const { return start_fragments_; }
size_t fragments() const { return fragments_; }
size_t commit_fragments() const { return commit_fragments_; }
size_t rollback_fragments() const { return commit_fragments_; }
private:
wsrep::id group_id_;
wsrep::id server_id_;
long long group_seqno_;
bf_abort_map bf_abort_map_;
enum wsrep::provider::status next_error_;
size_t start_fragments_;
size_t fragments_;
size_t commit_fragments_;
size_t rollback_fragments_;
};
}

View File

@ -14,20 +14,6 @@
namespace
{
inline bool starts_transaction(const wsrep::ws_meta& ws_meta)
{
return (ws_meta.flags() & wsrep::provider::flag::start_transaction);
}
inline bool commits_transaction(const wsrep::ws_meta& ws_meta)
{
return (ws_meta.flags() & wsrep::provider::flag::commit);
}
inline bool rolls_back_transaction(const wsrep::ws_meta& ws_meta)
{
return (ws_meta.flags() & wsrep::provider::flag::rollback);
}
}
@ -139,7 +125,8 @@ int wsrep::server_context::on_apply(
bool not_replaying(txc.state() !=
wsrep::transaction_context::s_replaying);
if (starts_transaction(ws_meta) && commits_transaction(ws_meta))
if (starts_transaction(ws_meta.flags()) &&
commits_transaction(ws_meta.flags()))
{
if (not_replaying)
{

View File

@ -36,8 +36,7 @@ wsrep::transaction_context::transaction_context(
, flags_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
, rollback_replicated_for_(false)
, streaming_context_()
{ }
@ -94,6 +93,29 @@ int wsrep::transaction_context::append_data(const wsrep::data& data)
return provider_.append_data(ws_handle_, data);
}
int wsrep::transaction_context::after_row()
{
wsrep::unique_lock<wsrep::mutex> lock(client_context_.mutex());
if (streaming_context_.fragment_size() > 0)
{
switch (streaming_context_.fragment_unit())
{
case streaming_context::row:
streaming_context_.increment_unit_counter();
if (streaming_context_.fragments_certified()
+ streaming_context_.unit_counter() >=
streaming_context_.fragment_size())
{
return certify_fragment(lock);
}
break;
default:
assert(0);
}
}
return 0;
}
int wsrep::transaction_context::before_prepare()
{
int ret(0);
@ -585,7 +607,7 @@ void wsrep::transaction_context::state(
{ /* ex pr ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */
{ 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 1, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
@ -612,16 +634,13 @@ void wsrep::transaction_context::state(
}
}
#if 0
int wsrep::transaction_context::certify_fragment(
wsrep::unique_lock<wsrep::mutex>& lock)
{
// This method is not fully implemented and tested yet.
throw wsrep::not_implemented_error();
assert(lock.owns_lock());
assert(client_context_.mode() == wsrep::client_context::m_replicating);
assert(rollback_replicated_for_ != id_);
assert(streaming_context_.rolled_back() == false);
client_context_.wait_for_replayers(lock);
if (state() == s_must_abort)
@ -634,10 +653,9 @@ int wsrep::transaction_context::certify_fragment(
lock.unlock();
uint32_t flags(0);
if (fragments_.empty())
if (streaming_context_.fragments_certified())
{
flags |= wsrep::provider::flag::start_transaction;
flags_ |= wsrep::provider::flag::start_transaction;
}
wsrep::data data;
@ -656,8 +674,14 @@ int wsrep::transaction_context::certify_fragment(
wsrep::client_context_switch client_context_switch(
client_context_,
*sr_client_context);
wsrep::transaction_context sr_transaction_context(*sr_client_context);
if (sr_client_context->append_fragment(sr_transaction_context, flags, data))
wsrep::unique_lock<wsrep::mutex> sr_lock(sr_client_context->mutex());
wsrep::transaction_context& sr_transaction_context(
sr_client_context->transaction_);
sr_transaction_context.state(sr_lock, s_certifying);
sr_lock.unlock();
if (sr_client_context->append_fragment(
sr_transaction_context, flags_, data))
{
lock.lock();
state(lock, s_must_abort);
@ -667,24 +691,43 @@ int wsrep::transaction_context::certify_fragment(
enum wsrep::provider::status
cert_ret(provider_.certify(client_context_.id().get(),
&sr_transaction_context.ws_handle_,
flags,
&sr_transaction_context.trx_meta_));
sr_transaction_context.ws_handle_,
flags_,
sr_transaction_context.ws_meta_));
int ret(0);
switch (cert_ret)
{
case wsrep::provider::success:
sr_client_context->commit(sr_transaction_context);
streaming_context_.certified(sr_transaction_context.ws_meta().seqno());
sr_lock.lock();
sr_transaction_context.certified_ = true;
sr_transaction_context.state(sr_lock, s_committing);
sr_lock.unlock();
if (sr_client_context->commit())
{
ret = 1;
}
break;
default:
sr_client_context->rollback(sr_transaction_context);
sr_client_context->rollback();
ret = 1;
break;
}
lock.lock();
if (ret)
{
client_context_.provider().rollback(id_);
streaming_context_.rolled_back(id_);
state(lock, s_must_abort);
}
else
{
state(lock, s_executing);
flags_ &= ~wsrep::provider::flag::start_transaction;
}
return ret;
}
#endif
int wsrep::transaction_context::certify_commit(
wsrep::unique_lock<wsrep::mutex>& lock)

View File

@ -102,6 +102,26 @@ namespace
const wsrep::transaction_context& tc;
};
struct streaming_client_fixture_row
{
streaming_client_fixture_row()
: sc("s1", "s1", wsrep::server_context::rm_sync)
, cc(sc, wsrep::client_id(1),
wsrep::client_context::m_replicating)
, tc(cc.transaction())
{
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
cc.enable_streaming(wsrep::transaction_context::streaming_context::row, 1);
}
wsrep::mock_server_context sc;
wsrep::mock_client_context cc;
const wsrep::transaction_context& tc;
};
typedef
boost::mpl::vector<replicating_client_fixture_sync_rm,
replicating_client_fixture_async_rm>
@ -519,14 +539,15 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
//
// Test a 1PC transaction which gets BF aborted during before_commit via
// provider after the write set was ordered and certified.
// provider after the write set was ordered and certified. This must
// result replaying of transaction.
//
BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_context_1pc_bf_during_before_commit_certified, T,
replicating_fixtures, T)
{
wsrep::mock_server_context& sc(T::sc);
wsrep::client_context& cc(T::cc);
wsrep::mock_client_context& cc(T::cc);
const wsrep::transaction_context& tc(T::tc);
// Start a new transaction with ID 1
@ -555,6 +576,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(cc.current_error() == wsrep::e_success);
BOOST_REQUIRE(cc.replays() == 1);
}
//
@ -1161,3 +1183,24 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_applying_rollback,
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(cc.current_error() == wsrep::e_success);
}
///////////////////////////////////////////////////////////////////////////////
// STREAMING REPLICATION //
///////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_1pc_commit,
streaming_client_fixture_row)
{
BOOST_REQUIRE(cc.start_transaction(1) == 0);
BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(cc.ordered_commit() == 0);
BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_success);
BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
}