1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Refactored client_service interface out of client_context

This commit is contained in:
Teemu Ollakka
2018-06-14 19:44:38 +03:00
parent 1ca3f7b649
commit 256000f934
12 changed files with 391 additions and 265 deletions

View File

@ -42,6 +42,7 @@
#include "provider.hpp"
#include "transaction_context.hpp"
#include "client_id.hpp"
#include "client_service.hpp"
#include "mutex.hpp"
#include "lock.hpp"
#include "buffer.hpp"
@ -133,6 +134,15 @@ namespace wsrep
assert(transaction_.active() == false);
}
/*!
*
*/
bool do_2pc() const
{
return client_service_.do_2pc();
}
/*!
* Method which should be called before the client
* starts processing the command received from the application.
@ -210,29 +220,42 @@ namespace wsrep
*/
enum after_statement_result after_statement();
//
// Replicating interface
//
int start_transaction(const wsrep::transaction_id& id)
{
assert(state_ == s_exec);
return transaction_.start_transaction(id);
}
int start_transaction(const wsrep::ws_handle& wsh,
const wsrep::ws_meta& meta)
int append_key(const wsrep::key& key)
{
assert(mode_ == m_applier);
return transaction_.start_transaction(wsh, meta);
assert(mode_ == m_replicating);
assert(state_ == s_exec);
return transaction_.append_key(key);
}
int start_replaying(const wsrep::ws_meta& ws_meta)
int append_data(const wsrep::const_buffer& data)
{
assert(mode_ == m_applier);
return transaction_.start_replaying(ws_meta);
assert(mode_ == m_replicating);
assert(state_ == s_exec);
return transaction_.append_data(data);
}
void adopt_transaction(wsrep::transaction_context& transaction)
int prepare_data_for_replication(const wsrep::transaction_context& tc)
{
transaction_.start_transaction(transaction.id());
transaction_.streaming_context_ = transaction.streaming_context_;
return client_service_.prepare_data_for_replication(*this, tc);
}
//
// Streaming interface
//
int after_row()
{
assert(mode_ == m_replicating);
assert(state_ == s_exec);
return transaction_.after_row();
}
int enable_streaming(
@ -240,6 +263,7 @@ namespace wsrep
fragment_unit,
size_t fragment_size)
{
assert(mode_ == m_replicating);
if (transaction_.active() &&
transaction_.streaming_context_.fragment_unit() !=
fragment_unit)
@ -253,24 +277,64 @@ namespace wsrep
fragment_unit, fragment_size);
return 0;
}
int append_key(const wsrep::key& key)
size_t bytes_generated() const
{
assert(state_ == s_exec);
return transaction_.append_key(key);
assert(mode_ == m_replicating);
return client_service_.bytes_generated();
}
int append_data(const wsrep::const_buffer& data)
int prepare_fragment_for_replication(
const wsrep::transaction_context& tc,
wsrep::mutable_buffer& mb)
{
assert(state_ == s_exec);
return transaction_.append_data(data);
return client_service_.prepare_fragment_for_replication(
*this, tc, mb);
}
int after_row()
int append_fragment(const wsrep::transaction_context& tc,
int flags,
const wsrep::const_buffer& buf)
{
assert(state_ == s_exec);
return transaction_.after_row();
return client_service_.append_fragment(tc, flags, buf);
}
/*!
* Remove fragments from the fragment storage. If the
* storage is transactional, this should be done within
* the same transaction which is committing.
*/
void remove_fragments()
{
client_service_.remove_fragments(transaction_);
}
//
// Applying interface
//
int start_transaction(const wsrep::ws_handle& wsh,
const wsrep::ws_meta& meta)
{
assert(mode_ == m_applier);
return transaction_.start_transaction(wsh, meta);
}
int apply(const wsrep::const_buffer& data)
{
assert(mode_ == m_applier);
return client_service_.apply(*this, data);
}
int commit()
{
assert(mode_ == m_applier);
return client_service_.commit(*this,
transaction_.ws_handle(), transaction_.ws_meta());
}
//
// Commit ordering
//
int before_prepare()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
@ -302,22 +366,96 @@ namespace wsrep
assert(state_ == s_exec);
return transaction_.after_commit();
}
//
// Rollback
//
int rollback()
{
return client_service_.rollback(*this);
}
int before_rollback()
{
assert(state_ == s_idle || state_ == s_exec || state_ == s_result);
return transaction_.before_rollback();
}
int after_rollback()
{
assert(state_ == s_idle || state_ == s_exec || state_ == s_result);
return transaction_.after_rollback();
}
//
// BF aborting
//
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::seqno bf_seqno)
{
assert(mode_ == m_replicating);
return transaction_.bf_abort(lock, bf_seqno);
}
//
// Replaying
//
int start_replaying(const wsrep::ws_meta& ws_meta)
{
assert(mode_ == m_applier);
return transaction_.start_replaying(ws_meta);
}
void adopt_transaction(wsrep::transaction_context& transaction)
{
assert(mode_ == m_applier);
transaction_.start_transaction(transaction.id());
transaction_.streaming_context_ = transaction.streaming_context_;
}
enum wsrep::provider::status replay(
wsrep::transaction_context& tc)
{
return client_service_.replay(tc);
}
//
//
//
void will_replay(const wsrep::transaction_context& tc)
{
client_service_.will_replay(tc);
}
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
{
client_service_.wait_for_replayers(*this, lock);
}
bool interrupted() const
{
return client_service_.interrupted();
}
void emergency_shutdown()
{
client_service_.emergency_shutdown();
}
//
// Debug interface
//
void debug_sync(const char* sync_point)
{
client_service_.debug_sync(*this, sync_point);
}
void debug_crash(const char* crash_point)
{
client_service_.debug_crash(crash_point);
}
/*!
* Get reference to the client mutex.
*
@ -395,11 +533,13 @@ namespace wsrep
*/
client_context(wsrep::mutex& mutex,
wsrep::server_context& server_context,
wsrep::client_service& client_service,
const client_id& id,
enum mode mode)
: thread_id_(wsrep::this_thread::get_id())
, mutex_(mutex)
, server_context_(server_context)
, client_service_(client_service)
, id_(id)
, mode_(mode)
, state_(s_idle)
@ -431,117 +571,12 @@ namespace wsrep
*/
void state(wsrep::unique_lock<wsrep::mutex>& lock, enum state state);
virtual bool is_autocommit() const = 0;
/*!
* Virtual method to return true if the client operates
* in two phase commit mode.
*
* \return True if two phase commit is required, false otherwise.
*/
virtual bool do_2pc() const = 0;
/*!
* Append SR fragment to the transaction.
*/
virtual int append_fragment(const wsrep::transaction_context&,
int, const wsrep::const_buffer&) = 0;
virtual void remove_fragments(const wsrep::transaction_context&) = 0;
/*!
* This method applies a write set give in data buffer.
* This must be implemented by the DBMS integration.
*
* \return Zero on success, non-zero on applying failure.
*/
virtual int apply(const wsrep::const_buffer& data) = 0;
/*!
* Virtual method which will be called
* in order to commit the transaction into
* storage engine.
*
* \return Zero on success, non-zero on failure.
*/
virtual int commit() = 0;
/*!
* Rollback the transaction.
*
* This metod must be implemented by DBMS integration.
*
* \return Zero on success, no-zero on failure.
*/
virtual int rollback() = 0;
/*!
* Notify a implementation that the client is about
* to replay the transaction.
*/
virtual void will_replay(wsrep::transaction_context&) = 0;
/*!
* Replay the transaction.
*/
virtual enum wsrep::provider::status
replay(wsrep::transaction_context& tc) = 0;
/*!
* Wait until all of the replaying transactions have been committed.
*/
virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0;
virtual int prepare_data_for_replication(
const wsrep::transaction_context&) = 0;
virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication(
const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0;
/*!
* Return true if the current client operation was killed.
*/
virtual bool killed() const = 0;
/*!
* Abort server operation on fatal error. This should be used
* only for critical conditions which would sacrifice data
* consistency.
*/
virtual void abort() = 0;
public:
/*!
* Set up thread global variables for client connection.
*/
virtual void store_globals()
{
thread_id_ = wsrep::this_thread::get_id();
}
private:
/*!
* Enter debug synchronization point.
*/
virtual void debug_sync(const char*) = 0;
/*!
*
*/
virtual void debug_suicide(const char*) = 0;
/*!
* Notify the implementation about an error.
*/
virtual void on_error(enum wsrep::client_error error) = 0;
/*!
*
*/
void override_error(enum wsrep::client_error error);
wsrep::thread::id thread_id_;
wsrep::mutex& mutex_;
wsrep::server_context& server_context_;
wsrep::client_service& client_service_;
client_id id_;
enum mode mode_;
enum state state_;
@ -566,11 +601,11 @@ namespace wsrep
: orig_context_(orig_context)
, current_context_(current_context)
{
current_context_.store_globals();
current_context_.client_service_.store_globals();
}
~client_context_switch()
{
orig_context_.store_globals();
orig_context_.client_service_.store_globals();
}
private:
client_context& orig_context_;

View File

@ -9,16 +9,22 @@
* which will be called by the wsrep-lib under certain circumstances,
* for example when a transaction rollback is required by internal
* wsrep-lib operation or applier client needs to apply a write set.
*
* \todo Figure out better name for this interface.
*/
#include "buffer.hpp"
namespace wsrep
{
class client_service
{
public:
client_service(wsrep::provider& provider)
: provider_(provider) { }
/*!
*
*/
virtual bool is_autocommit() const = 0;
/*!
* Return true if two pahase commit is required for transaction
* to commit.
@ -26,7 +32,8 @@ namespace wsrep
virtual bool do_2pc() const = 0;
/*!
* Return true if the current transaction has been interrupted.
* Return true if the current transaction has been interrupted
* by the DBMS.
*/
virtual bool interrupted() const = 0;
@ -45,28 +52,30 @@ namespace wsrep
/*!
* Set up a data for replication.
*/
virtual int prepare_data_for_replication(wsrep::data& data) = 0;
virtual int prepare_data_for_replication(wsrep::client_context&, const wsrep::transaction_context&) = 0;
//
// Streaming
//
virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication(
wsrep::client_context&, const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0;
virtual void remove_fragments(const wsrep::transaction_context&) = 0;
/*!
* Apply a write set.
*/
virtual int apply() = 0;
/*!
* Prepare a transaction for commit.
*/
virtual int prepare() = 0;
virtual int apply(wsrep::client_context&, const wsrep::const_buffer&) = 0;
/*!
* Commit transaction.
*/
virtual int commit() = 0;
virtual int commit(wsrep::client_context&, const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
/*!
* Roll back transaction.
*/
virtual int rollback() = 0;
virtual int rollback(wsrep::client_context&) = 0;
/*!
* Forcefully shut down the DBMS process or replication system.
@ -84,7 +93,7 @@ namespace wsrep
* \todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib.
*/
virtual void will_replay() = 0;
virtual void will_replay(const wsrep::transaction_context&) = 0;
/*!
* Replay the current transaction. The implementation must put
@ -94,7 +103,7 @@ namespace wsrep
* \todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib.
*/
virtual int replay() = 0;
virtual enum wsrep::provider::status replay(wsrep::transaction_context&) = 0;
/*!
* Wait until all replaying transactions have been finished
@ -103,13 +112,32 @@ namespace wsrep
* \todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib.
*/
virtual void wait_for_replayers() = 0;
virtual void wait_for_replayers(wsrep::client_context&, wsrep::unique_lock<wsrep::mutex>&) = 0;
// Streaming replication
/*!
* Append a write set fragment into fragment storage.
*/
virtual int append_fragment() = 0;
virtual int append_fragment(const wsrep::transaction_context&, int flag, const wsrep::const_buffer&) = 0;
//
// Debug interface
//
/*!
* Enter debug sync point.
*
* @params sync_point Name of the debug sync point.
*/
virtual void debug_sync(wsrep::client_context&, const char* sync_point) = 0;
/*!
* Forcefully kill the process if the crash_point has
* been enabled.
*/
virtual void debug_crash(const char* crash_point) = 0;
protected:
wsrep::provider& provider_;
};
/*!
@ -119,18 +147,6 @@ namespace wsrep
class client_debug_callback
{
public:
/*!
* Enter debug sync point.
*
* @params sync_point Name of the debug sync point.
*/
void debug_sync(const char* sync_point) = 0;
/*!
* Forcefully kill the process if the suicide_point has
* been enabled.
*/
void debug_suicide(const char* suicide_point) = 0;
};

View File

@ -241,7 +241,9 @@ namespace wsrep
/*!
* Replay a transaction.
*
* @return Zero in case of success, non-zero on failure.
* \todo Inspect if the ws_handle could be made const
*
* \return Zero in case of success, non-zero on failure.
*/
virtual enum status replay(
wsrep::ws_handle& ws_handle, void* applier_ctx) = 0;