1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-24 10:42:31 +03:00

Cleaned up client_state interface.

This commit is contained in:
Teemu Ollakka
2018-06-18 18:55:38 +03:00
parent cf231bdf2d
commit 60fb119fa1
8 changed files with 45 additions and 156 deletions

View File

@ -121,7 +121,9 @@ void db::client::run_one_transaction()
err = err || client_state_.after_commit(); err = err || client_state_.after_commit();
if (err) if (err)
{ {
client_state_.rollback(); client_state_.before_rollback();
se_trx_.rollback();
client_state_.after_rollback();
} }
return err; return err;
}); });

View File

@ -2,6 +2,8 @@
// Copyright (C) 2018 Codership Oy <info@codership.com> // Copyright (C) 2018 Codership Oy <info@codership.com>
// //
/** @file db_client.hpp */
#ifndef WSREP_DB_CLIENT_HPP #ifndef WSREP_DB_CLIENT_HPP
#define WSREP_DB_CLIENT_HPP #define WSREP_DB_CLIENT_HPP

View File

@ -773,7 +773,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING # spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched. # Note: If this tag is empty the current directory is searched.
INPUT = ../include/wsrep INPUT = ../include/wsrep ../dbsim ../src
# This tag can be used to specify the character encoding of the source files # This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

View File

@ -112,18 +112,6 @@ namespace wsrep
{ {
assert(transaction_.active() == false); assert(transaction_.active() == false);
} }
/**
* Return true if the transaction commit requires
* two-phase commit.
*
* @todo deprecate
*/
bool do_2pc() const
{
return client_service_.do_2pc();
}
/** @name Client command handling */ /** @name Client command handling */
/** @{ */ /** @{ */
/** /**
@ -242,10 +230,6 @@ namespace wsrep
return transaction_.append_data(data); return transaction_.append_data(data);
} }
int prepare_data_for_replication()
{
return client_service_.prepare_data_for_replication();
}
/** @} */ /** @} */
/** @name Streaming replication interface */ /** @name Streaming replication interface */
@ -277,43 +261,6 @@ namespace wsrep
enum wsrep::streaming_context::fragment_unit enum wsrep::streaming_context::fragment_unit
fragment_unit, fragment_unit,
size_t fragment_size); size_t fragment_size);
bool statement_allowed_for_streaming() const
{
return client_service_.statement_allowed_for_streaming();
}
/** @todo deprecate */
size_t bytes_generated() const
{
assert(mode_ == m_replicating);
return client_service_.bytes_generated();
}
/** @todo deprecate */
int prepare_fragment_for_replication(wsrep::mutable_buffer& mb)
{
return client_service_.prepare_fragment_for_replication(mb);
}
/** @todo deprecate */
int append_fragment(const wsrep::transaction& tc,
int flags,
const wsrep::const_buffer& buf)
{
return client_service_.append_fragment(tc, flags, buf);
}
/**
* Remove fragments from the fragment storage. If the
* storage is transactional, this should be done within
* the same transaction which is committing.
*
* @todo deprecate
*/
void remove_fragments()
{
client_service_.remove_fragments();
}
/** @} */
/** @name Applying interface */ /** @name Applying interface */
/** @{ */ /** @{ */
int start_transaction(const wsrep::ws_handle& wsh, int start_transaction(const wsrep::ws_handle& wsh,
@ -323,21 +270,6 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta); return transaction_.start_transaction(wsh, meta);
} }
/** @todo deprecate */
int apply(const wsrep::const_buffer& data)
{
assert(mode_ == m_high_priority);
return client_service_.apply(data);
}
int commit()
{
assert(mode_ == m_high_priority || mode_ == m_local);
return client_service_.commit(
transaction_.ws_handle(), transaction_.ws_meta());
}
/** @} */
/** @name Commit ordering interface */ /** @name Commit ordering interface */
/** @{ */ /** @{ */
int before_prepare() int before_prepare()
@ -372,14 +304,6 @@ namespace wsrep
return transaction_.after_commit(); return transaction_.after_commit();
} }
/** @} */ /** @} */
/** @name Rollback interface */
/** @{ */
int rollback()
{
return client_service_.rollback();
}
int before_rollback() int before_rollback()
{ {
assert(state_ == s_idle || state_ == s_exec || state_ == s_result); assert(state_ == s_idle || state_ == s_exec || state_ == s_result);
@ -419,37 +343,6 @@ namespace wsrep
transaction_.streaming_context_ = transaction.streaming_context_; transaction_.streaming_context_ = transaction.streaming_context_;
} }
/** @todo deprecate */
enum wsrep::provider::status replay(
wsrep::transaction&)
{
return client_service_.replay();
}
//
//
//
void will_replay(const wsrep::transaction&)
{
client_service_.will_replay();
}
/** @todo deprecate */
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
{
client_service_.wait_for_replayers(lock);
}
bool interrupted() const
{
return client_service_.interrupted();
}
void emergency_shutdown()
{
client_service_.emergency_shutdown();
}
// //
// Causal reads // Causal reads
// //
@ -499,19 +392,7 @@ namespace wsrep
* End non-blocking operation * End non-blocking operation
*/ */
void end_nbo(); void end_nbo();
/** @} */
//
// Debug interface
//
void debug_sync(const char* sync_point)
{
client_service_.debug_sync(sync_point);
}
void debug_crash(const char* crash_point)
{
client_service_.debug_crash(crash_point);
}
/** /**
* Get reference to the client mutex. * Get reference to the client mutex.
* *
@ -527,6 +408,8 @@ namespace wsrep
wsrep::server_state& server_state() const wsrep::server_state& server_state() const
{ return server_state_; } { return server_state_; }
wsrep::client_service& client_service() const
{ return client_service_; }
/** /**
* Get reference to the Provider which is associated * Get reference to the Provider which is associated
* with the client context. * with the client context.

View File

@ -17,6 +17,7 @@
namespace wsrep namespace wsrep
{ {
class client_service;
class client_state; class client_state;
class key; class key;
class const_buffer; class const_buffer;
@ -138,6 +139,7 @@ namespace wsrep
wsrep::provider& provider_; wsrep::provider& provider_;
wsrep::server_service& server_service_; wsrep::server_service& server_service_;
wsrep::client_service& client_service_;
wsrep::client_state& client_state_; wsrep::client_state& client_state_;
wsrep::transaction_id id_; wsrep::transaction_id id_;
enum state state_; enum state state_;

View File

@ -145,18 +145,18 @@ int wsrep::server_state::on_apply(
client_state.start_replaying(ws_meta); client_state.start_replaying(ws_meta);
} }
if (client_state.apply(data)) if (client_state.client_service().apply(data))
{ {
ret = 1; ret = 1;
} }
else if (client_state.commit()) else if (client_state.client_service().commit(ws_handle, ws_meta))
{ {
ret = 1; ret = 1;
} }
if (ret) if (ret)
{ {
client_state.rollback(); client_state.client_service().rollback();
} }
if (not_replaying) if (not_replaying)
@ -186,7 +186,7 @@ int wsrep::server_state::on_apply(
wsrep::client_state_switch sw(client_state, *sac); wsrep::client_state_switch sw(client_state, *sac);
sac->before_command(); sac->before_command();
sac->before_statement(); sac->before_statement();
sac->apply(data); sac->client_service().apply(data);
sac->after_statement(); sac->after_statement();
sac->after_command_before_result(); sac->after_command_before_result();
sac->after_command_after_result(); sac->after_command_after_result();
@ -214,7 +214,7 @@ int wsrep::server_state::on_apply(
wsrep::client_state_switch(client_state, *sac); wsrep::client_state_switch(client_state, *sac);
sac->before_command(); sac->before_command();
sac->before_statement(); sac->before_statement();
ret = sac->apply(data); ret = sac->client_service().apply(data);
sac->after_statement(); sac->after_statement();
sac->after_command_before_result(); sac->after_command_before_result();
sac->after_command_after_result(); sac->after_command_after_result();
@ -245,7 +245,7 @@ int wsrep::server_state::on_apply(
wsrep::client_state_switch(client_state, *sac); wsrep::client_state_switch(client_state, *sac);
sac->before_command(); sac->before_command();
sac->before_statement(); sac->before_statement();
ret = sac->commit(); ret = sac->client_service().commit(ws_handle, ws_meta);
sac->after_statement(); sac->after_statement();
sac->after_command_before_result(); sac->after_command_before_result();
sac->after_command_after_result(); sac->after_command_after_result();
@ -257,8 +257,8 @@ int wsrep::server_state::on_apply(
else else
{ {
ret = client_state.start_replaying(ws_meta) || ret = client_state.start_replaying(ws_meta) ||
client_state.apply(wsrep::const_buffer()) || client_state.client_service().apply(wsrep::const_buffer()) ||
client_state.commit(); client_state.client_service().commit(ws_handle, ws_meta);
} }
} }
else else

View File

@ -25,6 +25,7 @@ wsrep::transaction::transaction(
wsrep::client_state& client_state) wsrep::client_state& client_state)
: provider_(client_state.provider()) : provider_(client_state.provider())
, server_service_(client_state.server_state().server_service()) , server_service_(client_state.server_state().server_service())
, client_service_(client_state.client_service())
, client_state_(client_state) , client_state_(client_state)
, id_(transaction_id::invalid()) , id_(transaction_id::invalid())
, state_(s_executing) , state_(s_executing)
@ -123,7 +124,7 @@ int wsrep::transaction::after_row()
} }
break; break;
case streaming_context::bytes: case streaming_context::bytes:
if (client_state_.bytes_generated() >= if (client_service_.bytes_generated() >=
streaming_context_.bytes_certified() streaming_context_.bytes_certified()
+ streaming_context_.fragment_size()) + streaming_context_.fragment_size())
{ {
@ -161,20 +162,20 @@ int wsrep::transaction::before_prepare(
case wsrep::client_state::m_replicating: case wsrep::client_state::m_replicating:
if (is_streaming()) if (is_streaming())
{ {
client_state_.debug_crash( client_service_.debug_crash(
"crash_last_fragment_commit_before_fragment_removal"); "crash_last_fragment_commit_before_fragment_removal");
lock.unlock(); lock.unlock();
if (client_state_.statement_allowed_for_streaming() == false) if (client_service_.statement_allowed_for_streaming() == false)
{ {
client_state_.override_error(wsrep::e_error_during_commit); client_state_.override_error(wsrep::e_error_during_commit);
ret = 1; ret = 1;
} }
else else
{ {
client_state_.remove_fragments(); client_service_.remove_fragments();
} }
lock.lock(); lock.lock();
client_state_.debug_crash( client_service_.debug_crash(
"crash_last_fragment_commit_after_fragment_removal"); "crash_last_fragment_commit_after_fragment_removal");
} }
break; break;
@ -182,7 +183,7 @@ int wsrep::transaction::before_prepare(
case wsrep::client_state::m_high_priority: case wsrep::client_state::m_high_priority:
if (is_streaming()) if (is_streaming())
{ {
client_state_.remove_fragments(); client_service_.remove_fragments();
} }
break; break;
default: default:
@ -257,7 +258,7 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_replicating: case wsrep::client_state::m_replicating:
if (state() == s_executing) if (state() == s_executing)
{ {
assert(client_state_.do_2pc() == false); assert(client_service_.do_2pc() == false);
ret = before_prepare(lock) || after_prepare(lock); ret = before_prepare(lock) || after_prepare(lock);
assert((ret == 0 && state() == s_committing) assert((ret == 0 && state() == s_committing)
|| ||
@ -312,7 +313,7 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_high_priority: case wsrep::client_state::m_high_priority:
assert(certified()); assert(certified());
assert(ordered()); assert(ordered());
if (client_state_.do_2pc() == false) if (client_service_.do_2pc() == false)
{ {
ret = before_prepare(lock) || after_prepare(lock); ret = before_prepare(lock) || after_prepare(lock);
} }
@ -498,7 +499,7 @@ int wsrep::transaction::after_statement()
case s_cert_failed: case s_cert_failed:
client_state_.override_error(wsrep::e_deadlock_error); client_state_.override_error(wsrep::e_deadlock_error);
lock.unlock(); lock.unlock();
ret = client_state_.rollback(); ret = client_service_.rollback();
lock.lock(); lock.lock();
if (state() != s_must_replay) if (state() != s_must_replay)
{ {
@ -510,7 +511,7 @@ int wsrep::transaction::after_statement()
{ {
state(lock, s_replaying); state(lock, s_replaying);
lock.unlock(); lock.unlock();
enum wsrep::provider::status replay_ret(client_state_.replay(*this)); enum wsrep::provider::status replay_ret(client_service_.replay());
switch (replay_ret) switch (replay_ret)
{ {
case wsrep::provider::success: case wsrep::provider::success:
@ -522,7 +523,7 @@ int wsrep::transaction::after_statement()
ret = 1; ret = 1;
break; break;
default: default:
client_state_.emergency_shutdown(); client_service_.emergency_shutdown();
break; break;
} }
lock.lock(); lock.lock();
@ -697,7 +698,7 @@ int wsrep::transaction::certify_fragment(
assert(client_state_.mode() == wsrep::client_state::m_replicating); assert(client_state_.mode() == wsrep::client_state::m_replicating);
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
client_state_.wait_for_replayers(lock); client_service_.wait_for_replayers(lock);
if (state() == s_must_abort) if (state() == s_must_abort)
{ {
client_state_.override_error(wsrep::e_deadlock_error); client_state_.override_error(wsrep::e_deadlock_error);
@ -709,7 +710,7 @@ int wsrep::transaction::certify_fragment(
lock.unlock(); lock.unlock();
wsrep::mutable_buffer data; wsrep::mutable_buffer data;
if (client_state_.prepare_fragment_for_replication(data)) if (client_service_.prepare_fragment_for_replication(data))
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
@ -719,8 +720,6 @@ int wsrep::transaction::certify_fragment(
// Client context to store fragment in separate transaction // Client context to store fragment in separate transaction
// Switch temporarily to sr_transaction, switch back // Switch temporarily to sr_transaction, switch back
// to original when this goes out of scope // to original when this goes out of scope
// std::auto_ptr<wsrep::client_state> sr_client_state(
// client_state_.server_state().local_client_state());
wsrep::scoped_client_state<wsrep::client_deleter> sr_client_state_scope( wsrep::scoped_client_state<wsrep::client_deleter> sr_client_state_scope(
server_service_.local_client_state(), server_service_.local_client_state(),
wsrep::client_deleter(server_service_)); wsrep::client_deleter(server_service_));
@ -735,7 +734,7 @@ int wsrep::transaction::certify_fragment(
sr_client_state.transaction_); sr_client_state.transaction_);
sr_transaction.state(sr_lock, s_certifying); sr_transaction.state(sr_lock, s_certifying);
sr_lock.unlock(); sr_lock.unlock();
if (sr_client_state.append_fragment( if (sr_client_state.client_service().append_fragment(
sr_transaction, flags_, sr_transaction, flags_,
wsrep::const_buffer(data.data(), data.size()))) wsrep::const_buffer(data.data(), data.size())))
{ {
@ -760,7 +759,8 @@ int wsrep::transaction::certify_fragment(
sr_transaction.certified_ = true; sr_transaction.certified_ = true;
sr_transaction.state(sr_lock, s_committing); sr_transaction.state(sr_lock, s_committing);
sr_lock.unlock(); sr_lock.unlock();
if (sr_client_state.commit()) if (sr_client_state.client_service().commit(
sr_transaction.ws_handle(), sr_transaction.ws_meta()))
{ {
ret = 1; ret = 1;
} }
@ -769,7 +769,7 @@ int wsrep::transaction::certify_fragment(
sr_lock.lock(); sr_lock.lock();
sr_transaction.state(sr_lock, s_must_abort); sr_transaction.state(sr_lock, s_must_abort);
sr_lock.unlock(); sr_lock.unlock();
sr_client_state.rollback(); sr_client_state.client_service().rollback();
ret = 1; ret = 1;
break; break;
} }
@ -791,7 +791,7 @@ int wsrep::transaction::certify_commit(
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
assert(active()); assert(active());
client_state_.wait_for_replayers(lock); client_service_.wait_for_replayers(lock);
assert(lock.owns_lock()); assert(lock.owns_lock());
@ -807,7 +807,7 @@ int wsrep::transaction::certify_commit(
lock.unlock(); lock.unlock();
if (client_state_.prepare_data_for_replication()) if (client_service_.prepare_data_for_replication())
{ {
// Note: Error must be set by prepare_data_for_replication() // Note: Error must be set by prepare_data_for_replication()
lock.lock(); lock.lock();
@ -816,7 +816,7 @@ int wsrep::transaction::certify_commit(
return 1; return 1;
} }
if (client_state_.interrupted()) if (client_service_.interrupted())
{ {
lock.lock(); lock.lock();
client_state_.override_error(wsrep::e_interrupted_error); client_state_.override_error(wsrep::e_interrupted_error);
@ -824,13 +824,13 @@ int wsrep::transaction::certify_commit(
return 1; return 1;
} }
client_state_.debug_sync("wsrep_before_certification"); client_service_.debug_sync("wsrep_before_certification");
enum wsrep::provider::status enum wsrep::provider::status
cert_ret(provider_.certify(client_state_.id().get(), cert_ret(provider_.certify(client_state_.id().get(),
ws_handle_, ws_handle_,
flags(), flags(),
ws_meta_)); ws_meta_));
client_state_.debug_sync("wsrep_after_certification"); client_service_.debug_sync("wsrep_after_certification");
lock.lock(); lock.lock();
@ -852,7 +852,7 @@ int wsrep::transaction::certify_commit(
// We got BF aborted after succesful certification // We got BF aborted after succesful certification
// and before acquiring client context lock. This means that // and before acquiring client context lock. This means that
// the trasaction must be replayed. // the trasaction must be replayed.
client_state_.will_replay(*this); client_service_.will_replay();
state(lock, s_must_replay); state(lock, s_must_replay);
break; break;
default: default:
@ -878,7 +878,7 @@ int wsrep::transaction::certify_commit(
// yet known. Therefore the transaction must roll back // yet known. Therefore the transaction must roll back
// and go through replay either to replay and commit the whole // and go through replay either to replay and commit the whole
// transaction or to determine failed certification status. // transaction or to determine failed certification status.
client_state_.will_replay(*this); client_service_.will_replay();
if (state() != s_must_abort) if (state() != s_must_abort)
{ {
state(lock, s_must_abort); state(lock, s_must_abort);
@ -906,7 +906,7 @@ int wsrep::transaction::certify_commit(
case wsrep::provider::error_fatal: case wsrep::provider::error_fatal:
client_state_.override_error(wsrep::e_error_during_commit); client_state_.override_error(wsrep::e_error_during_commit);
state(lock, s_must_abort); state(lock, s_must_abort);
client_state_.emergency_shutdown(); client_service_.emergency_shutdown();
break; break;
case wsrep::provider::error_not_implemented: case wsrep::provider::error_not_implemented:
case wsrep::provider::error_not_allowed: case wsrep::provider::error_not_allowed:

View File

@ -29,7 +29,7 @@ namespace wsrep
{ {
if (transaction().active()) if (transaction().active())
{ {
(void)rollback(); (void)client_service().rollback();
} }
} }
private: private: