1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-03 16:22:35 +03:00

TOI replicating and applying codepaths

This commit is contained in:
Teemu Ollakka
2018-06-25 09:28:51 +03:00
parent 6a369a5d0e
commit cad9176474
11 changed files with 324 additions and 169 deletions

View File

@ -5,7 +5,7 @@
#include "db_client_service.hpp" #include "db_client_service.hpp"
#include "db_client.hpp" #include "db_client.hpp"
int db::client_service::apply(const wsrep::const_buffer&) int db::client_service::apply_write_set(const wsrep::const_buffer&)
{ {
db::client* client(client_state_.client()); db::client* client(client_state_.client());
client->se_trx_.start(client); client->se_trx_.start(client);
@ -13,6 +13,11 @@ int db::client_service::apply(const wsrep::const_buffer&)
return 0; return 0;
} }
int db::client_service::apply_toi(const wsrep::const_buffer&)
{
return 0;
}
int db::client_service::commit(const wsrep::ws_handle&, int db::client_service::commit(const wsrep::ws_handle&,
const wsrep::ws_meta&) const wsrep::ws_meta&)
{ {

View File

@ -67,7 +67,9 @@ namespace db
void remove_fragments() override void remove_fragments() override
{ } { }
int apply(const wsrep::const_buffer&) override; int apply_write_set(const wsrep::const_buffer&) override;
int apply_toi(const wsrep::const_buffer&) override;
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;

View File

@ -81,8 +81,23 @@ namespace wsrep
/** /**
* Apply a write set. * Apply a write set.
*
* A write set applying happens always
* as a part of the transaction. The caller must start a
* new transaction before applying a write set and must
* either commit to make changes persistent or roll back.
*/ */
virtual int apply(const wsrep::const_buffer&) = 0; virtual int apply_write_set(const wsrep::const_buffer&) = 0;
/**
* Apply a TOI operation.
*
* TOI operation is a standalone operation and should not
* be executed as a part of a transaction.
*/
virtual int apply_toi(const wsrep::const_buffer&) = 0;
// //
// Interface to global server state // Interface to global server state
// //

View File

@ -399,6 +399,7 @@ namespace wsrep
/** /**
* Enter total order isolation critical section. * Enter total order isolation critical section.
*
* @param key_array Array of keys * @param key_array Array of keys
* @param buffer Buffer containing the action to execute inside * @param buffer Buffer containing the action to execute inside
* total order isolation section * total order isolation section
@ -409,7 +410,30 @@ namespace wsrep
int enter_toi(const wsrep::key_array& key_array, int enter_toi(const wsrep::key_array& key_array,
const wsrep::const_buffer& buffer, const wsrep::const_buffer& buffer,
int flags); int flags);
/**
* Enter applying total order critical section.
*
* @param ws_meta Write set meta data
*/
int enter_toi(const wsrep::ws_meta& ws_meta);
/**
* Return true if the client_state is under TOI operation.
*/
bool in_toi() const
{
return (toi_meta_.seqno().is_undefined() == false);
}
/**
* Return the mode where client entered into TOI mode.
* The return value can be either m_replicating or
* m_high_priority.
*/
enum mode toi_mode() const
{
return toi_mode_;
}
/** /**
* Leave total order isolation critical section. * Leave total order isolation critical section.
*/ */
@ -556,6 +580,7 @@ namespace wsrep
, client_service_(client_service) , client_service_(client_service)
, id_(id) , id_(id)
, mode_(mode) , mode_(mode)
, toi_mode_()
, state_(s_none) , state_(s_none)
, transaction_(*this) , transaction_(*this)
, toi_meta_() , toi_meta_()
@ -585,6 +610,7 @@ namespace wsrep
wsrep::client_service& client_service_; wsrep::client_service& client_service_;
wsrep::client_id id_; wsrep::client_id id_;
enum mode mode_; enum mode mode_;
enum mode toi_mode_;
enum state state_; enum state state_;
wsrep::transaction transaction_; wsrep::transaction transaction_;
wsrep::ws_meta toi_meta_; wsrep::ws_meta toi_meta_;

View File

@ -344,7 +344,20 @@ namespace wsrep
return (flags & wsrep::provider::flag::rollback); return (flags & wsrep::provider::flag::rollback);
} }
static inline bool is_toi(int flags)
{
return (flags & wsrep::provider::flag::isolation);
}
static inline bool is_commutative(int flags)
{
return (flags & wsrep::provider::flag::commutative);
}
static inline bool is_native(int flags)
{
return (flags & wsrep::provider::flag::native);
}
} }
#endif // WSREP_PROVIDER_HPP #endif // WSREP_PROVIDER_HPP

View File

@ -242,12 +242,14 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys,
int flags) int flags)
{ {
assert(state_ == s_exec); assert(state_ == s_exec);
assert(mode_ == m_replicating);
int ret; int ret;
switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags)) switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags))
{ {
case wsrep::provider::success: case wsrep::provider::success:
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
toi_mode_ = mode_;
mode(lock, m_toi); mode(lock, m_toi);
ret = 0; ret = 0;
break; break;
@ -260,22 +262,38 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys,
return ret; return ret;
} }
int wsrep::client_state::enter_toi(const wsrep::ws_meta& ws_meta)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_high_priority);
toi_mode_ = mode_;
mode(lock, m_toi);
toi_meta_ = ws_meta;
return 0;
}
int wsrep::client_state::leave_toi() int wsrep::client_state::leave_toi()
{ {
int ret; int ret;
switch (provider().leave_toi(id_)) if (toi_mode_ == m_replicating)
{ {
case wsrep::provider::success: switch (provider().leave_toi(id_))
ret = 0; {
break; case wsrep::provider::success:
default: ret = 0;
assert(0); break;
override_error(wsrep::e_error_during_commit); default:
ret = 1; assert(0);
break; override_error(wsrep::e_error_during_commit);
ret = 1;
break;
}
} }
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
mode(lock, m_replicating); mode(lock, toi_mode_);
toi_mode_ = m_local;
toi_meta_ = wsrep::ws_meta();
return ret; return ret;
} }
// Private // Private
@ -326,11 +344,11 @@ void wsrep::client_state::mode(
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
static const char allowed[mode_max_][mode_max_] = static const char allowed[mode_max_][mode_max_] =
{ { /* l r h t */
{ 0, 0, 0, 0}, /* local */ { 0, 0, 0, 0 }, /* local */
{ 0, 0, 1, 1}, /* repl */ { 0, 0, 1, 1 }, /* repl */
{ 0, 1, 0, 0}, /* high prio */ { 0, 1, 0, 1 }, /* high prio */
{ 0, 1, 0, 0} /* toi */ { 0, 1, 1, 0 } /* toi */
}; };
if (allowed[mode_][mode]) if (allowed[mode_][mode])
{ {

View File

@ -4,4 +4,4 @@
#include "wsrep/exception.hpp" #include "wsrep/exception.hpp"
bool wsrep::abort_on_exception(false); bool wsrep::abort_on_exception(true);

View File

@ -15,6 +15,196 @@
namespace namespace
{ {
int apply_write_set(wsrep::server_state& server_state,
wsrep::client_state& client_state,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret(0);
const wsrep::transaction& txc(client_state.transaction());
assert(client_state.mode() == wsrep::client_state::m_high_priority);
bool not_replaying(txc.state() !=
wsrep::transaction::s_replaying);
if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
if (not_replaying)
{
client_state.before_command();
client_state.before_statement();
assert(txc.active() == false);
client_state.start_transaction(ws_handle, ws_meta);
}
else
{
client_state.start_replaying(ws_meta);
}
if (client_state.client_service().apply_write_set(data))
{
ret = 1;
}
else if (client_state.client_service().commit(ws_handle, ws_meta))
{
ret = 1;
}
if (ret)
{
client_state.client_service().rollback();
}
if (not_replaying)
{
client_state.after_statement();
client_state.after_command_before_result();
client_state.after_command_after_result();
}
assert(ret ||
txc.state() == wsrep::transaction::s_committed);
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
assert(not_replaying);
assert(server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::client_state* sac(
server_state.server_service().streaming_applier_client_state());
server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sac);
sac->start_transaction(ws_handle, ws_meta);
{
// TODO: Client context switch will be ultimately
// implemented by the application. Therefore need to
// introduce a virtual method call which takes
// both original and sac client contexts as argument
// and a functor which does the applying.
wsrep::client_state_switch sw(client_state, *sac);
sac->before_command();
sac->before_statement();
sac->client_service().apply_write_set(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
server_state.server_service().log_dummy_write_set(client_state, ws_meta);
}
else if (ws_meta.flags() == 0)
{
wsrep::client_state* sac(
server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_state_switch(client_state, *sac);
sac->before_command();
sac->before_statement();
ret = sac->client_service().apply_write_set(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
server_state.server_service().log_dummy_write_set(
client_state, ws_meta);
}
else if (wsrep::commits_transaction(ws_meta.flags()))
{
if (not_replaying)
{
wsrep::client_state* sac(
server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
assert(sac);
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning()
<< "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_state_switch(client_state, *sac);
sac->before_command();
sac->before_statement();
ret = sac->client_service().commit(ws_handle, ws_meta);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_client_state(sac);
}
}
else
{
ret = client_state.start_replaying(ws_meta) ||
client_state.client_service().apply_write_set(
wsrep::const_buffer()) ||
client_state.client_service().commit(ws_handle, ws_meta);
}
}
else
{
// SR fragment applying not implemented yet
assert(0);
}
if (not_replaying)
{
assert(txc.active() == false);
}
return ret;
}
int apply_toi(wsrep::server_service&,
wsrep::client_state& client_state,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
// Regular toi
client_state.enter_toi(ws_meta);
int ret(client_state.client_service().apply_toi(data));
client_state.leave_toi();
return ret;
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
// NBO begin
throw wsrep::not_implemented_error();
}
else if (wsrep::commits_transaction(ws_meta.flags()))
{
// NBO end
throw wsrep::not_implemented_error();
}
else
{
assert(0);
return 0;
}
}
} }
@ -315,154 +505,21 @@ int wsrep::server_state::on_apply(
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data)
{ {
int ret(0); if (is_toi(ws_meta.flags()))
const wsrep::transaction& txc(client_state.transaction());
assert(client_state.mode() == wsrep::client_state::m_high_priority);
bool not_replaying(txc.state() !=
wsrep::transaction::s_replaying);
if (starts_transaction(ws_meta.flags()) &&
commits_transaction(ws_meta.flags()))
{ {
if (not_replaying) return apply_toi(server_service_, client_state, ws_meta, data);
{
client_state.before_command();
client_state.before_statement();
assert(txc.active() == false);
client_state.start_transaction(ws_handle, ws_meta);
}
else
{
client_state.start_replaying(ws_meta);
}
if (client_state.client_service().apply(data))
{
ret = 1;
}
else if (client_state.client_service().commit(ws_handle, ws_meta))
{
ret = 1;
}
if (ret)
{
client_state.client_service().rollback();
}
if (not_replaying)
{
client_state.after_statement();
client_state.after_command_before_result();
client_state.after_command_after_result();
}
assert(ret ||
txc.state() == wsrep::transaction::s_committed);
} }
else if (starts_transaction(ws_meta.flags())) else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
{ {
assert(not_replaying); // Not implemented yet.
assert(find_streaming_applier( assert(0);
ws_meta.server_id(), ws_meta.transaction_id()) == 0); return 0;
wsrep::client_state* sac(server_service_.streaming_applier_client_state());
start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sac);
sac->start_transaction(ws_handle, ws_meta);
{
// TODO: Client context switch will be ultimately
// implemented by the application. Therefore need to
// introduce a virtual method call which takes
// both original and sac client contexts as argument
// and a functor which does the applying.
wsrep::client_state_switch sw(client_state, *sac);
sac->before_command();
sac->before_statement();
sac->client_service().apply(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
server_service_.log_dummy_write_set(client_state, ws_meta);
}
else if (ws_meta.flags() == 0)
{
wsrep::client_state* sac(
find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_state_switch(client_state, *sac);
sac->before_command();
sac->before_statement();
ret = sac->client_service().apply(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
}
server_service_.log_dummy_write_set(client_state, ws_meta);
}
else if (commits_transaction(ws_meta.flags()))
{
if (not_replaying)
{
wsrep::client_state* sac(
find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
assert(sac);
if (sac == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
wsrep::client_state_switch(client_state, *sac);
sac->before_command();
sac->before_statement();
ret = sac->client_service().commit(ws_handle, ws_meta);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_service_.release_client_state(sac);
}
}
else
{
ret = client_state.start_replaying(ws_meta) ||
client_state.client_service().apply(wsrep::const_buffer()) ||
client_state.client_service().commit(ws_handle, ws_meta);
}
} }
else else
{ {
// SR fragment applying not implemented yet return apply_write_set(*this, client_state,
assert(0); ws_handle, ws_meta, data);
} }
if (not_replaying)
{
assert(txc.active() == false);
}
return ret;
} }
void wsrep::server_state::start_streaming_applier( void wsrep::server_state::start_streaming_applier(

View File

@ -357,8 +357,6 @@ namespace
const wsrep_trx_meta_t* meta, const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop __attribute__((unused))) wsrep_bool_t* exit_loop __attribute__((unused)))
{ {
wsrep_cb_status_t ret(WSREP_CB_SUCCESS);
wsrep::client_state* client_state( wsrep::client_state* client_state(
reinterpret_cast<wsrep::client_state*>(ctx)); reinterpret_cast<wsrep::client_state*>(ctx));
assert(client_state); assert(client_state);
@ -375,13 +373,22 @@ namespace
meta->stid.trx, meta->stid.trx,
meta->stid.conn), wsrep::seqno(seqno_from_native(meta->depends_on)), meta->stid.conn), wsrep::seqno(seqno_from_native(meta->depends_on)),
map_flags_from_native(flags)); map_flags_from_native(flags));
if (ret == WSREP_CB_SUCCESS && try
client_state->server_state().on_apply(
*client_state, ws_handle, ws_meta, data))
{ {
ret = WSREP_CB_FAILURE; if (client_state->server_state().on_apply(
*client_state, ws_handle, ws_meta, data))
{
return WSREP_CB_FAILURE;
}
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
{
wsrep::log_error() << "Caught runtime error while applying "
<< ws_meta.flags() << ": "
<< e.what();
return WSREP_CB_FAILURE;
} }
return ret;
} }
wsrep_cb_status_t synced_cb(void* app_ctx) wsrep_cb_status_t synced_cb(void* app_ctx)

View File

@ -6,15 +6,23 @@
#include "mock_client_state.hpp" #include "mock_client_state.hpp"
int wsrep::mock_client_service::apply( int wsrep::mock_client_service::apply_write_set(
const wsrep::const_buffer&) const wsrep::const_buffer&)
{ {
assert(client_state_.toi_meta().seqno().is_undefined());
assert(client_state_.transaction().state() == wsrep::transaction::s_executing || assert(client_state_.transaction().state() == wsrep::transaction::s_executing ||
client_state_.transaction().state() == wsrep::transaction::s_replaying); client_state_.transaction().state() == wsrep::transaction::s_replaying);
return (fail_next_applying_ ? 1 : 0); return (fail_next_applying_ ? 1 : 0);
} }
int wsrep::mock_client_service::apply_toi(const wsrep::const_buffer&)
{
assert(client_state_.transaction().active() == false);
assert(client_state_.toi_meta().seqno().is_undefined() == false);
return (fail_next_toi_ ? 1 : 0);
}
int wsrep::mock_client_service::commit( int wsrep::mock_client_service::commit(
const wsrep::ws_handle&, const wsrep::ws_meta&) const wsrep::ws_handle&, const wsrep::ws_meta&)
{ {

View File

@ -47,6 +47,7 @@ namespace wsrep
, is_autocommit_() , is_autocommit_()
, do_2pc_() , do_2pc_()
, fail_next_applying_() , fail_next_applying_()
, fail_next_toi_()
, bf_abort_during_wait_() , bf_abort_during_wait_()
, error_during_prepare_data_() , error_during_prepare_data_()
, killed_before_certify_() , killed_before_certify_()
@ -58,7 +59,9 @@ namespace wsrep
, aborts_() , aborts_()
{ } { }
int apply(const wsrep::const_buffer&) WSREP_OVERRIDE; int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE;
int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE;
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE; WSREP_OVERRIDE;
@ -169,6 +172,7 @@ namespace wsrep
bool is_autocommit_; bool is_autocommit_;
bool do_2pc_; bool do_2pc_;
bool fail_next_applying_; bool fail_next_applying_;
bool fail_next_toi_;
bool bf_abort_during_wait_; bool bf_abort_during_wait_;
bool error_during_prepare_data_; bool error_during_prepare_data_;
bool killed_before_certify_; bool killed_before_certify_;