1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Moved public API includes under include/trrep

This commit is contained in:
Teemu Ollakka
2018-05-07 19:40:10 +03:00
parent db5bcb50ef
commit 0988978826
29 changed files with 51 additions and 40 deletions

View File

@ -2,8 +2,8 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "client_context.hpp"
#include "compiler.hpp"
#include "trrep/client_context.hpp"
#include "trrep/compiler.hpp"
#include <sstream>
#include <iostream>

View File

@ -1,531 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/*! \file client_context.hpp
*
* Client Context
* ==============
*
* This file provides abstraction for integrating DBMS client
* with replication system.
*
* Client Modes
* ============
*
* Local
* -----
*
* Replicating
* -----------
*
* Applier
* --------
*
* Client State
* ============
*
* Client state is mainly relevant for the operation if the Server
* supports synchronous rollback mode only. In this case the transactions
* of the the idle clients (controls is in application which is using the
* DBMS system) which encounter Brute Force Abort (BFA) must be rolled
* back either by applier or a background process. If the client
* state is executing, the control is inside the DBMS system and
* the rollback process should be performed by the client which
* drives the transaction.
*/
#ifndef TRREP_CLIENT_CONTEXT_HPP
#define TRREP_CLIENT_CONTEXT_HPP
#include "server_context.hpp"
#include "transaction_context.hpp"
#include "mutex.hpp"
#include "lock.hpp"
#include "data.hpp"
namespace trrep
{
class server_context;
class provider;
enum client_error
{
e_success,
e_error_during_commit,
e_deadlock_error,
e_append_fragment_error
};
static inline std::string to_string(enum client_error error)
{
switch (error)
{
case e_success: return "success";
case e_error_during_commit: return "error_during_commit";
case e_deadlock_error: return "deadlock_error";
case e_append_fragment_error: return "append_fragment_error";
}
return "unknown";
}
class client_id
{
public:
template <typename I>
client_id(I id)
: id_(static_cast<wsrep_conn_id_t>(id))
{ }
wsrep_conn_id_t get() const { return id_; }
static wsrep_conn_id_t invalid() { return -1; }
private:
wsrep_conn_id_t id_;
};
/*! \class Client Context
*
* Client Contex abstract interface.
*/
class client_context
{
public:
/*!
* Client mode enumeration.
* \todo m_toi total order isolation mode
*/
enum mode
{
/*! Operates in local only mode, no replication. */
m_local,
/*! Generates write sets for replication by the provider. */
m_replicating,
/*! Applies write sets from the provider. */
m_applier
};
/*!
* Client state enumeration.
*
*/
enum state
{
/*!
* Client is idle, the control is in the application which
* uses the DBMS system.
*/
s_idle,
/*!
* The control of the client processing is inside the DBMS
* system.
*/
s_exec,
/*!
* The client session is terminating.
*/
s_quitting
};
const static int state_max_ = s_quitting + 1;
/*!
* Destructor.
*/
virtual ~client_context()
{
assert(transaction_.active() == false);
}
/*!
* Virtual method which should be called before the client
* starts processing the command received from the application.
* This method will wait until the possible synchronous
* rollback for associated transaction has finished.
* The method has a side effect of changing the client
* context state to executing.
*
* If overridden, the implementation should call base
* class method before any implementation specific operations.
*
* \return Zero in case of success, non-zero in case of the
* associated transaction was BF aborted.
*/
virtual int before_command();
/*!
* Virtual method which should be called before returning
* the control back to application which uses the DBMS system.
* This method will check if the transaction associated to
* the connection has been aborted. This method has a side effect
* of changing the client state to idle.
*
* If overridden, the implementation should call base
* class metods after any implementation specifict operations.
*/
virtual void after_command();
/*!
* Before statement execution operations.
*
* Check if server is synced and if dirty reads are allowed.
*
* If the method is overridden by the implementation, base class
* method should be called before any implementation specifc
* operations.
*
* \return Zero in case of success, non-zero if the statement
* is not allowed to be executed due to read or write
* isolation requirements.
*/
virtual int before_statement();
/*!
* After statement execution operations.
*
* * Check for must_replay state
* * Do rollback if requested
*
* If overridden by the implementation, base class method
* should be called after any implementation specific operations.
*/
virtual void after_statement();
int start_transaction(const trrep::transaction_id& id)
{
assert(state_ == s_exec);
return transaction_.start_transaction(id);
}
int start_transaction(const wsrep_ws_handle_t& wsh,
const wsrep_trx_meta_t& meta,
uint32_t flags)
{
assert(mode_ == m_applier);
return transaction_.start_transaction(wsh, meta, flags);
}
int append_key(const trrep::key& key)
{
assert(state_ == s_exec);
return transaction_.append_key(key);
}
int append_data(const trrep::data& data)
{
assert(state_ == s_exec);
return transaction_.append_data(data);
}
int before_prepare()
{
assert(state_ == s_exec);
return transaction_.before_prepare();
}
int after_prepare()
{
assert(state_ == s_exec);
return transaction_.after_prepare();
}
int before_commit()
{
assert(state_ == s_exec);
return transaction_.before_commit();
}
int ordered_commit()
{
assert(state_ == s_exec);
return transaction_.ordered_commit();
}
int after_commit()
{
assert(state_ == s_exec);
return transaction_.after_commit();
}
int before_rollback()
{
assert(state_ == s_exec);
return transaction_.before_rollback();
}
int after_rollback()
{
assert(state_ == s_exec);
return transaction_.after_rollback();
}
/*!
* Get reference to the client mutex.
*
* \return Reference to the client mutex.
*/
trrep::mutex& mutex() { return mutex_; }
/*!
* Get server context associated the the client session.
*
* \return Reference to server context.
*/
trrep::server_context& server_context() const
{ return server_context_; }
/*!
* Get reference to the Provider which is associated
* with the client context.
*
* \return Reference to the provider.
* \throw trrep::runtime_error if no providers are associated
* with the client context.
*/
trrep::provider& provider() const;
/*!
* Get Client identifier.
*
* \return Client Identifier
*/
client_id id() const { return id_; }
/*!
* Get Client mode.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client mode.
*/
enum mode mode() const { return mode_; }
trrep::transaction_context& transaction()
{
return transaction_;
}
void debug_log_level(int level) { debug_log_level_ = level; }
int debug_log_level() const
{
return std::max(debug_log_level_,
server_context_.debug_log_level());
}
void reset_error()
{
current_error_ = trrep::e_success;
}
enum trrep::client_error current_error() const
{
return current_error_;
}
protected:
/*!
* Client context constuctor. This is protected so that it
* can be called from derived class constructors only.
*/
client_context(trrep::mutex& mutex,
trrep::server_context& server_context,
const client_id& id,
enum mode mode)
: mutex_(mutex)
, server_context_(server_context)
, id_(id)
, mode_(mode)
, state_(s_idle)
, transaction_(*this)
, allow_dirty_reads_()
, debug_log_level_(0)
, current_error_(trrep::e_success)
{ }
private:
client_context(const client_context&);
client_context& operator=(client_context&);
/*
* Friend declarations
*/
friend int server_context::on_apply(client_context&,
trrep::transaction_context&,
const trrep::data&);
friend class client_context_switch;
friend class client_applier_mode;
friend class transaction_context;
/*!
* Get Client state.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client state
*/
enum state state() const { return state_; }
/*!
* Set client state.
*/
void state(trrep::unique_lock<trrep::mutex>& lock, enum state state);
/*!
* Virtual method to return true if the client operates
* in two phase commit mode.
*
* \return True if two phase commit is required, false otherwise.
*/
virtual bool do_2pc() const = 0;
/*!
* Append SR fragment to the transaction.
*/
virtual int append_fragment(trrep::transaction_context&,
uint32_t, const trrep::data&)
{ return 0; }
/*!
* This method applies a write set give in data buffer.
* This must be implemented by the DBMS integration.
*
* \return Zero on success, non-zero on applying failure.
*/
virtual int apply(trrep::transaction_context& transaction,
const trrep::data& data) = 0;
/*!
* Virtual method which will be called
* in order to commit the transaction into
* storage engine.
*
* \return Zero on success, non-zero on failure.
*/
virtual int commit(trrep::transaction_context&) = 0;
/*!
* Rollback the transaction.
*
* This metod must be implemented by DBMS integration.
*
* \return Zero on success, no-zero on failure.
*/
virtual int rollback(trrep::transaction_context&) = 0;
/*!
* Notify a implementation that the client is about
* to replay the transaction.
*/
virtual void will_replay(trrep::transaction_context&) = 0;
/*!
* Replay the transaction.
*/
virtual int replay(trrep::transaction_context& tc) = 0;
/*!
* Wait until all of the replaying transactions have been committed.
*/
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&) const = 0;
virtual int prepare_data_for_replication(
const trrep::transaction_context&, trrep::data& data)
{
static const char buf[1] = { 1 };
data.assign(buf, 1);
return 0;
}
/*!
* Return true if the current client operation was killed.
*/
virtual bool killed() const = 0;
/*!
* Abort server operation on fatal error. This should be used
* only for critical conditions which would sacrifice data
* consistency.
*/
virtual void abort() const = 0;
/*!
* Set up thread global variables for client connection.
*/
virtual void store_globals() = 0;
/*!
* Enter debug synchronization point.
*/
virtual void debug_sync(const std::string&) = 0;
/*!
*
*/
virtual void debug_suicide(const std::string&) = 0;
/*!
* Notify the implementation about an error.
*/
virtual void on_error(enum trrep::client_error error) = 0;
/*!
*
*/
void override_error(enum trrep::client_error error)
{
if (current_error_ != trrep::e_success &&
error == trrep::e_success)
{
throw trrep::runtime_error("Overriding error with success");
}
current_error_ = error;
}
trrep::mutex& mutex_;
trrep::server_context& server_context_;
client_id id_;
enum mode mode_;
enum state state_;
trrep::transaction_context transaction_;
/*!
* \todo This boolean should be converted to better read isolation
* semantics.
*/
bool allow_dirty_reads_;
int debug_log_level_;
trrep::client_error current_error_;
};
class client_context_switch
{
public:
client_context_switch(trrep::client_context& orig_context,
trrep::client_context& current_context)
: orig_context_(orig_context)
, current_context_(current_context)
{
current_context_.store_globals();
}
~client_context_switch()
{
orig_context_.store_globals();
}
private:
client_context& orig_context_;
client_context& current_context_;
};
class client_applier_mode
{
public:
client_applier_mode(trrep::client_context& client)
: client_(client)
, orig_mode_(client.mode_)
{
client_.mode_ = trrep::client_context::m_applier;
}
~client_applier_mode()
{
client_.mode_ = orig_mode_;
}
private:
trrep::client_context& client_;
enum trrep::client_context::mode orig_mode_;
};
}
#endif // TRREP_CLIENT_CONTEXT_HPP

View File

@ -1,16 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/*! \file compiler.hpp
*
* Compiler specific options.
*/
#define TRREP_UNUSED __attribute__((unused))
#if __cplusplus >= 201103L
#define TRREP_OVERRIDE override
#else
#define TRREP_OVERRIDE
#endif // __cplusplus >= 201103L

View File

@ -1,73 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_CONDITION_VARIABLE_HPP
#define TRREP_CONDITION_VARIABLE_HPP
#include "lock.hpp"
#include <cstdlib>
namespace trrep
{
class condition_variable
{
public:
condition_variable() { }
virtual ~condition_variable() { }
virtual void notify_one() = 0;
virtual void notify_all() = 0;
virtual void wait(trrep::unique_lock<trrep::mutex>& lock) = 0;
private:
condition_variable(const condition_variable&);
condition_variable& operator=(const condition_variable&);
};
// Default pthreads based condition variable implementation
class default_condition_variable : public condition_variable
{
public:
default_condition_variable()
: cond_()
{
if (pthread_cond_init(&cond_, 0))
{
throw trrep::runtime_error("Failed to initialized condvar");
}
}
~default_condition_variable()
{
if (pthread_cond_destroy(&cond_))
{
::abort();
}
}
void notify_one()
{
(void)pthread_cond_signal(&cond_);
}
void notify_all()
{
(void)pthread_cond_broadcast(&cond_);
}
void wait(trrep::unique_lock<trrep::mutex>& lock)
{
if (pthread_cond_wait(
&cond_,
reinterpret_cast<pthread_mutex_t*>(lock.mutex().native())))
{
throw trrep::runtime_error("Cond wait failed");
}
}
private:
pthread_cond_t cond_;
};
}
#endif // TRREP_CONDITION_VARIABLE_HPP

View File

@ -1,36 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_DATA_HPP
#define TRREP_DATA_HPP
namespace trrep
{
class data
{
public:
data()
: buf_()
{
assign(0, 0);
}
data(const void* ptr, size_t len)
: buf_()
{
assign(ptr, len);
}
void assign(const void* ptr, size_t len)
{
buf_.ptr = ptr;
buf_.len = len;
}
const wsrep_buf_t& get() const { return buf_; }
private:
wsrep_buf_t buf_;
};
}
#endif // TRREP_DATA_HPP

View File

@ -9,15 +9,15 @@
//
#include "server_context.hpp"
#include "client_context.hpp"
#include "transaction_context.hpp"
#include "key.hpp"
#include "data.hpp"
#include "provider.hpp"
#include "condition_variable.hpp"
#include "view.hpp"
#include "logger.hpp"
#include "trrep/server_context.hpp"
#include "trrep/client_context.hpp"
#include "trrep/transaction_context.hpp"
#include "trrep/key.hpp"
#include "trrep/data.hpp"
#include "trrep/provider.hpp"
#include "trrep/condition_variable.hpp"
#include "trrep/view.hpp"
#include "trrep/logger.hpp"
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>

View File

@ -1,36 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_EXCEPTION_HPP
#define TRREP_EXCEPTION_HPP
#include <stdexcept>
#include <cstdlib>
namespace trrep
{
class runtime_error : public std::runtime_error
{
public:
runtime_error(const std::string& msg)
: std::runtime_error(msg)
{
::abort();
}
};
class not_implemented_error : public std::exception
{
public:
not_implemented_error()
: std::exception()
{
::abort();
}
};
}
#endif // TRREP_EXCEPTION_HPP

View File

@ -1,41 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_KEY_HPP
#define TRREP_KEY_HPP
#include "exception.hpp"
namespace trrep
{
class key
{
public:
key()
: key_parts_()
, key_()
{
key_.key_parts = key_parts_;
key_.key_parts_num = 0;
}
void append_key_part(const void* ptr, size_t len)
{
if (key_.key_parts_num == 3)
{
throw trrep::runtime_error("key parts exceed maximum of 3");
}
key_parts_[key_.key_parts_num].ptr = ptr;
key_parts_[key_.key_parts_num].len = len;
++key_.key_parts_num;
}
const wsrep_key_t& get() const { return key_; }
private:
wsrep_buf_t key_parts_[3];
wsrep_key_t key_;
};
}
#endif // TRREP_KEY_HPP

View File

@ -1,61 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_LOCK_HPP
#define TRREP_LOCK_HPP
#include "mutex.hpp"
#include <cassert>
namespace trrep
{
template <class M>
class unique_lock
{
public:
unique_lock(M& mutex)
: mutex_(mutex)
, locked_(false)
{
mutex_.lock();
locked_ = true;
}
~unique_lock()
{
if (locked_)
{
unlock();
}
}
void lock()
{
mutex_.lock();
assert(locked_ == false);
locked_ = true;
}
void unlock()
{
assert(locked_);
locked_ = false;
mutex_.unlock();
}
bool owns_lock() const
{
return locked_;
}
M& mutex() { return mutex_; }
private:
unique_lock(const unique_lock&);
unique_lock& operator=(const unique_lock&);
M& mutex_;
bool locked_;
};
}
#endif // TRREP_LOCK_HPP

View File

@ -1,5 +1,5 @@
#include "logger.hpp"
#include "trrep/logger.hpp"
#include <iostream>

View File

@ -1,49 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_LOGGER_HPP
#define TRREP_LOGGER_HPP
#include "mutex.hpp"
#include "lock.hpp"
#include <iosfwd>
#include <sstream>
namespace trrep
{
class log
{
public:
log(const std::string& prefix = "INFO")
: prefix_(prefix)
, oss_()
{ }
~log()
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
os_ << prefix_ << ": " << oss_.str() << "\n";
}
template <typename T>
std::ostream& operator<<(const T& val)
{
return (oss_ << val);
}
private:
const std::string prefix_;
std::ostringstream oss_;
static trrep::mutex& mutex_;
static std::ostream& os_;
};
class log_debug : public log
{
public:
log_debug()
: log("DEBUG") { }
};
}
#endif // TRREP_LOGGER_HPP

View File

@ -2,7 +2,7 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "trrep/transaction_context.hpp"
#include "mock_client_context.hpp"

View File

@ -5,9 +5,9 @@
#ifndef TRREP_MOCK_CLIENT_CONTEXT_HPP
#define TRREP_MOCK_CLIENT_CONTEXT_HPP
#include "client_context.hpp"
#include "mutex.hpp"
#include "compiler.hpp"
#include "trrep/client_context.hpp"
#include "trrep/mutex.hpp"
#include "trrep/compiler.hpp"
namespace trrep
{

View File

@ -5,7 +5,7 @@
#ifndef TRREP_MOCK_PROVIDER_HPP
#define TRREP_MOCK_PROVIDER_HPP
#include "provider.hpp"
#include "trrep/provider.hpp"
#include <cstring>
#include <map>

View File

@ -5,11 +5,11 @@
#ifndef TRREP_MOCK_SERVER_CONTEXT_HPP
#define TRREP_MOCK_SERVER_CONTEXT_HPP
#include "server_context.hpp"
#include "trrep/server_context.hpp"
#include "mock_client_context.hpp"
#include "mock_provider.hpp"
#include "compiler.hpp"
#include "trrep/compiler.hpp"
namespace trrep
{

View File

@ -3,7 +3,7 @@
//
#include "mock_utils.hpp"
#include "client_context.hpp"
#include "trrep/client_context.hpp"
#include "mock_server_context.hpp"

View File

@ -11,7 +11,7 @@ namespace trrep
class mock_server_context;
}
#include "transaction_context.hpp"
#include "trrep/transaction_context.hpp"
//
// Utility functions

View File

@ -1,77 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_MUTEX_HPP
#define TRREP_MUTEX_HPP
#include "exception.hpp"
#include <pthread.h>
namespace trrep
{
//!
//!
//!
class mutex
{
public:
mutex() { }
virtual ~mutex() { }
virtual void lock() = 0;
virtual void unlock() = 0;
/* Return native handle */
virtual void* native() = 0;
private:
mutex(const mutex& other);
mutex& operator=(const mutex& other);
};
// Default pthread implementation
class default_mutex : public trrep::mutex
{
public:
default_mutex()
: trrep::mutex(),
mutex_()
{
if (pthread_mutex_init(&mutex_, 0))
{
throw trrep::runtime_error("mutex init failed");
}
}
~default_mutex()
{
if (pthread_mutex_destroy(&mutex_))
{
throw trrep::runtime_error("mutex destroy failed");
}
}
void lock()
{
if (pthread_mutex_lock(&mutex_))
{
throw trrep::runtime_error("mutex lock failed");
}
}
void unlock()
{
if (pthread_mutex_unlock(&mutex_))
{
throw trrep::runtime_error("mutex unlock failed");
}
}
void* native()
{
return &mutex_;
}
private:
pthread_mutex_t mutex_;
};
}
#endif // TRREP_MUTEX_HPP

View File

@ -2,7 +2,7 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "provider.hpp"
#include "trrep/provider.hpp"
#include "provider_impl.hpp"

View File

@ -1,92 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_PROVIDER_HPP
#define TRREP_PROVIDER_HPP
// #include "provider_impl.hpp"
#include <wsrep_api.h>
#include <string>
#include <vector>
namespace trrep
{
// Abstract interface for provider implementations
class provider
{
public:
class status_variable
{
public:
status_variable(const std::string& name,
const std::string& value)
: name_(name)
, value_(value)
{ }
const std::string& name() const { return name_; }
const std::string& value() const { return value_; }
private:
std::string name_;
std::string value_;
};
virtual ~provider() { }
// Provider state management
virtual int connect(const std::string& cluster_name,
const std::string& cluster_url,
const std::string& state_donor,
bool bootstrap) = 0;
virtual int disconnect() = 0;
// Applier interface
virtual wsrep_status_t run_applier(void* applier_ctx) = 0;
// Write set replication
// TODO: Rename to assing_read_view()
virtual int start_transaction(wsrep_ws_handle_t*) = 0;
virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0;
virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0;
virtual wsrep_status_t
certify(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) = 0;
/*!
* BF abort a transaction inside provider.
*
* @param[in] bf_seqno Seqno of the aborter transaction
* @param[in] victim_txt Transaction identifier of the victim
* @param[out] victim_seqno Sequence number of the victim transaction
* or WSREP_SEQNO_UNDEFINED if the victim was not ordered
*
* @return wsrep_status_t
*/
virtual wsrep_status_t bf_abort(wsrep_seqno_t bf_seqno,
wsrep_trx_id_t victim_trx,
wsrep_seqno_t* victim_seqno) = 0;
virtual int rollback(const wsrep_trx_id_t) = 0;
virtual wsrep_status commit_order_enter(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) = 0;
virtual int commit_order_leave(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0;
/*!
* Replay a transaction.
*
* @return Zero in case of success, non-zero on failure.
*/
virtual int replay(wsrep_ws_handle_t* ws_handle, void* applier_ctx) = 0;
virtual int sst_sent(const wsrep_gtid_t&, int) = 0;
virtual int sst_received(const wsrep_gtid_t&, int) = 0;
virtual std::vector<status_variable> status() const = 0;
// Factory method
static provider* make_provider(const std::string& provider);
};
}
#endif // TRREP_PROVIDER_HPP

View File

@ -2,12 +2,12 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "server_context.hpp"
#include "client_context.hpp"
#include "transaction_context.hpp"
#include "view.hpp"
#include "logger.hpp"
#include "compiler.hpp"
#include "trrep/server_context.hpp"
#include "trrep/client_context.hpp"
#include "trrep/transaction_context.hpp"
#include "trrep/view.hpp"
#include "trrep/logger.hpp"
#include "trrep/compiler.hpp"
// Todo: refactor into provider factory
#include "mock_provider.hpp"

View File

@ -1,422 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/*! \file server_context.hpp
*
* Server Context Abstraction
* ==========================
*
* This file defines an interface for TRRep Server Context.
* The Server Context will encapsulate server identification,
* server state and server capabilities. The class also
* defines an interface for manipulating server state, applying
* of remote transaction write sets, processing SST requests,
* creating local client connections for local storage access
* operations.
*
* Concepts
* ========
*
* State Snapshot Transfer
* -----------------------
*
* Depending on SST type (physical or logical), the server storage
* engine initialization must be done before or after SST happens.
* In case of physical SST method (typically rsync, filesystem snapshot)
* the SST happens before the storage engine is initialized, in case
* of logical backup typically after the storage engine initialization.
*
* Rollback Mode
* -------------
*
* When High Prioity Transaction (HTP) write set is applied, it
* may be required that the HTP Brute Force Aborts (BFA) locally
* executing transaction. As HTP must be able to apply all its
* write sets without interruption, the locally executing transaction
* must yield immediately, otherwise a transaction processing
* may stop or even deadlock. Depending on DBMS implementation,
* the local transaction may need to be rolled back immediately
* (synchronous mode) or the rollback may happen later on
* (asynchronous mode). The Server Context implementation
* which derives from Server Context base class must provide
* the base class the rollback mode which server operates on.
*
* ### Synchronous
*
* If the DBMS server implementation does not allow asynchronous rollback,
* the victim transaction must be rolled back immediately in order to
* allow transaction processing to proceed. Depending on DBMS process model,
* there may be either background thread which processes the rollback
* or the rollback can be done by the HTP applier.
*
* ### Asynchronous
*
* In asynchronous mode the BFA victim transaction is just marked
* to be aborted or in case of fully optimistic concurrency control,
* the conflict is detected at commit.
*
*/
#ifndef TRREP_SERVER_CONTEXT_HPP
#define TRREP_SERVER_CONTEXT_HPP
#include "exception.hpp"
#include "mutex.hpp"
#include "condition_variable.hpp"
#include "wsrep_api.h"
#include <string>
#include <vector>
namespace trrep
{
// Forward declarations
class provider;
class client_context;
class transaction_context;
class view;
class data;
/*! \class Server Context
*
*
*/
class server_context
{
public:
/*!
* Server state enumeration.
*
* \todo Fix UML generation
*
* Server state diagram if the sst_before_init() returns false.
*
* [*] --> disconnected
* disconnected --> initializing
* initializing --> initialized
* initialized --> connected
* connected --> joiner
* joiner --> joined
* joined --> synced
* synced --> donor
* donor --> joined
*
* Server state diagram if the sst_before_init() returns true.
*
* [*] --> disconnected
* disconnected --> connected
* connected --> joiner
* joiner --> initializing
* initializing --> initialized
* initialized --> joined
* joined --> synced
* synced --> donor
* donor --> joined
*/
enum state
{
/*! Server is in disconnected state. */
s_disconnected,
/*! Server is initializing */
s_initializing,
/*! Server has been initialized */
s_initialized,
/*! Server is connected to the cluster */
s_connected,
/*! Server is receiving SST */
s_joiner,
/*! Server has received SST succesfully but has not synced
with rest of the cluster yet. */
s_joined,
/*! Server is donating state snapshot transfer */
s_donor,
/*! Server has synced with the cluster */
s_synced,
/*! Server is disconnecting from group */
s_disconnecting
};
static const int n_states_ = s_disconnecting + 1;
/*!
* Rollback Mode enumeration
*/
enum rollback_mode
{
/*! Asynchronous rollback mode */
rm_async,
/*! Synchronous rollback mode */
rm_sync
};
virtual ~server_context();
/*!
* Return human readable server name.
*
* \return Human readable server name string.
*/
const std::string& name() const { return name_; }
/*!
* Return Server identifier string.
*
* \return Server indetifier string.
*/
const std::string& id() const { return id_; }
/*!
* Return server group communication address.
*
* \return Return server group communication address.
*/
const std::string& address() const { return address_; }
/*!
* Get the rollback mode which server is operating in.
*
* \return Rollback mode.
*/
enum rollback_mode rollback_mode() const { return rollback_mode_; }
/*!
* Create client context which acts only locally, i.e. does
* not participate in replication. However, local client
* connection may execute transactions which require ordering,
* as when modifying local SR fragment storage requires
* strict commit ordering.
*
* \return Pointer to Client Context.
*/
virtual client_context* local_client_context() = 0;
/*!
* Load WSRep provider.
*
* \param provider WSRep provider library to be loaded.
* \param provider_options Provider specific options string
* to be passed for provider during initialization.
*
* \return Zero on success, non-zero on error.
*/
int load_provider(const std::string& provider,
const std::string& provider_options);
/*!
* Return reference to provider.
*
* \return Reference to provider
*
* \throw trrep::runtime_error if provider has not been loaded
*/
virtual trrep::provider& provider() const
{
if (provider_ == 0)
{
throw trrep::runtime_error("provider not loaded");
}
return *provider_;
}
int connect(const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,
bool bootstrap);
int disconnect();
/*!
* Virtual method which will be called when the server
* has been joined to the cluster. Must be provided by
* the implementation.
*
* \todo Document overriding.
*/
virtual void on_connect();
/*!
* Virtual method which will be called when a view
* notification event has been delivered by the
* provider.
*
* \todo Document overriding.
*
* \params view trrep::view object which holds the new view
* information.
*/
virtual void on_view(const trrep::view& view);
/*!
* Virtual method which will be called when the server
* has been synchronized with the cluster.
*
* \todo Document overriding.
*/
virtual void on_sync();
/*!
* Wait until server reaches given state.
*/
void wait_until_state(trrep::server_context::state) const;
/*!
* Virtual method to return true if the configured SST
* method requires SST to be performed before DBMS storage
* engine initialization, false otherwise.
*/
virtual bool sst_before_init() const = 0;
/*!
* Virtual method which will be called on *joiner* when the provider
* requests the SST request information. This method should
* provide a string containing an information which the donor
* server can use to donate SST.
*/
virtual std::string on_sst_required() = 0;
/*!
* Virtual method which will be called on *donor* when the
* SST request has been delivered by the provider.
* This method should initiate SST transfer or throw
* a trrep::runtime_error
* if the SST transfer cannot be initiated. If the SST request
* initiation is succesful, the server remains in s_donor
* state until the SST is over or fails. The \param bypass
* should be passed to SST implementation. If the flag is true,
* no actual SST should happen, but the joiner server should
* be notified that the donor has seen the request. The notification
* should included \param gtid provided. This must be passed
* to sst_received() call on the joiner.
*
* \todo Figure out better exception for error codition.
*
* \param sst_request SST request string provided by the joiner.
* \param gtid GTID denoting the current replication position.
* \param bypass Boolean bypass flag.
*/
virtual void on_sst_request(const std::string& sst_request,
const wsrep_gtid_t& gtid,
bool bypass) = 0;
/*!
*
*/
void sst_sent(const wsrep_gtid_t& gtid, int error);
/*!
* This method must be called by the joiner after the SST
* transfer has been received.
*
* \param gtid GTID provided by the SST transfer
*/
void sst_received(const wsrep_gtid_t& gtid, int error);
/*!
*
*/
/*!
* This method will be called by the provider hen
* a remote write set is being applied. It is the responsibility
* of the caller to set up transaction context and data properly.
*
* \todo Make this private, allow calls for provider implementations
* only.
* \param client_context Applier client context.
* \param transaction_context Transaction context.
* \param data Write set data
*
* \return Zero on success, non-zero on failure.
*/
int on_apply(trrep::client_context& client_context,
trrep::transaction_context& transaction_context,
const trrep::data& data);
/*!
* This virtual method should be implemented by the DBMS
* to provide information if the current statement in processing
* is allowd for streaming replication.
*
* \return True if the statement is allowed for streaming
* replication, false otherwise.
*/
virtual bool statement_allowed_for_streaming(
const trrep::client_context& client_context,
const trrep::transaction_context& transaction_context) const;
void debug_log_level(int level) { debug_log_level_ = level; }
int debug_log_level() const { return debug_log_level_; }
protected:
/*! Server Context constructor
*
* \param mutex Mutex provided by the DBMS implementation.
* \param name Human Readable Server Name.
* \param id Server Identifier String, UUID or some unique
* identifier.
* \param address Server address in form of IPv4 address, IPv6 address
* or hostname.
* \param working_dir Working directory for replication specific
* data files.
* \param rollback_mode Rollback mode which server operates on.
*/
server_context(trrep::mutex& mutex,
trrep::condition_variable& cond,
const std::string& name,
const std::string& id,
const std::string& address,
const std::string& working_dir,
enum rollback_mode rollback_mode)
: mutex_(mutex)
, cond_(cond)
, state_(s_disconnected)
, state_waiters_(n_states_)
, provider_()
, name_(name)
, id_(id)
, address_(address)
, working_dir_(working_dir)
, rollback_mode_(rollback_mode)
, debug_log_level_(0)
{ }
private:
server_context(const server_context&);
server_context& operator=(const server_context&);
void state(trrep::unique_lock<trrep::mutex>&, enum state);
trrep::mutex& mutex_;
trrep::condition_variable& cond_;
enum state state_;
mutable std::vector<int> state_waiters_;
trrep::provider* provider_;
std::string name_;
std::string id_;
std::string address_;
std::string working_dir_;
enum rollback_mode rollback_mode_;
int debug_log_level_;
};
static inline std::string to_string(enum trrep::server_context::state state)
{
switch (state)
{
case trrep::server_context::s_disconnected: return "disconnected";
case trrep::server_context::s_initializing: return "initilizing";
case trrep::server_context::s_initialized: return "initilized";
case trrep::server_context::s_connected: return "connected";
case trrep::server_context::s_joiner: return "joiner";
case trrep::server_context::s_joined: return "joined";
case trrep::server_context::s_donor: return "donor";
case trrep::server_context::s_synced: return "synced";
case trrep::server_context::s_disconnecting: return "disconnecting";
}
return "unknown";
}
}
#endif // TRREP_SERVER_CONTEXT_HPP

View File

@ -2,13 +2,13 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "client_context.hpp"
#include "server_context.hpp"
#include "key.hpp"
#include "data.hpp"
#include "logger.hpp"
#include "compiler.hpp"
#include "trrep/transaction_context.hpp"
#include "trrep/client_context.hpp"
#include "trrep/server_context.hpp"
#include "trrep/key.hpp"
#include "trrep/data.hpp"
#include "trrep/logger.hpp"
#include "trrep/compiler.hpp"
#include <sstream>
#include <memory>

View File

@ -1,194 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_TRANSACTION_CONTEXT_HPP
#define TRREP_TRANSACTION_CONTEXT_HPP
#include "provider.hpp"
#include "server_context.hpp"
#include "lock.hpp"
#include <wsrep_api.h>
#include <cassert>
#include <vector>
namespace trrep
{
class client_context;
class key;
class data;
class transaction_id
{
public:
template <typename I>
transaction_id(I id)
: id_(static_cast<wsrep_trx_id_t>(id))
{ }
wsrep_trx_id_t get() const { return id_; }
static wsrep_trx_id_t invalid() { return wsrep_trx_id_t(-1); }
bool operator==(const transaction_id& other) const
{ return (id_ == other.id_); }
bool operator!=(const transaction_id& other) const
{ return (id_ != other.id_); }
private:
wsrep_trx_id_t id_;
};
class transaction_context
{
public:
enum state
{
s_executing,
s_preparing,
s_certifying,
s_committing,
s_ordered_commit,
s_committed,
s_cert_failed,
s_must_abort,
s_aborting,
s_aborted,
s_must_replay,
s_replaying
};
static const int n_states = s_replaying + 1;
enum state state() const
{ return state_; }
transaction_context(trrep::client_context& client_context);
~transaction_context();
// Accessors
trrep::transaction_id id() const
{ return id_; }
bool active() const
{ return (id_ != trrep::transaction_id::invalid()); }
void state(trrep::unique_lock<trrep::mutex>&, enum state);
// Return true if the certification of the last
// fragment succeeded
bool certified() { return certified_; }
wsrep_seqno_t seqno() const
{
return trx_meta_.gtid.seqno;
}
// Return true if the last fragment was ordered by the
// provider
bool ordered() const { return (trx_meta_.gtid.seqno > 0); }
bool is_streaming() const
{
// Streaming support not yet implemented
return false;
}
bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
//
int start_transaction()
{
assert(active() == false);
assert(trx_meta_.stid.trx != transaction_id::invalid());
return start_transaction(trx_meta_.stid.trx);
}
int start_transaction(const trrep::transaction_id& id);
int start_transaction(const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags);
int append_key(const trrep::key&);
int append_data(const trrep::data&);
int after_row();
int before_prepare();
int after_prepare();
int before_commit();
int ordered_commit();
int after_commit();
int before_rollback();
int after_rollback();
int before_statement();
int after_statement();
bool bf_abort(trrep::unique_lock<trrep::mutex>& lock,
const transaction_context& txc);
uint32_t flags() const
{
return flags_;
}
trrep::mutex& mutex();
wsrep_ws_handle_t& ws_handle() { return ws_handle_; }
private:
transaction_context(const transaction_context&);
transaction_context operator=(const transaction_context&);
void flags(uint32_t flags) { flags_ = flags; }
int certify_fragment(trrep::unique_lock<trrep::mutex>&);
int certify_commit(trrep::unique_lock<trrep::mutex>&);
void remove_fragments();
void clear_fragments();
void cleanup();
void debug_log_state(const std::string&) const;
trrep::provider& provider_;
trrep::client_context& client_context_;
trrep::transaction_id id_;
enum state state_;
std::vector<enum state> state_hist_;
enum state bf_abort_state_;
int bf_abort_client_state_;
wsrep_ws_handle_t ws_handle_;
wsrep_trx_meta_t trx_meta_;
uint32_t flags_;
bool pa_unsafe_;
bool certified_;
std::vector<wsrep_gtid_t> fragments_;
trrep::transaction_id rollback_replicated_for_;
};
static inline std::string to_string(enum trrep::transaction_context::state state)
{
switch (state)
{
case trrep::transaction_context::s_executing: return "executing";
case trrep::transaction_context::s_preparing: return "preparing";
case trrep::transaction_context::s_certifying: return "certifying";
case trrep::transaction_context::s_committing: return "committing";
case trrep::transaction_context::s_ordered_commit: return "ordered_commit";
case trrep::transaction_context::s_committed: return "committed";
case trrep::transaction_context::s_cert_failed: return "cert_failed";
case trrep::transaction_context::s_must_abort: return "must_abort";
case trrep::transaction_context::s_aborting: return "aborting";
case trrep::transaction_context::s_aborted: return "aborted";
case trrep::transaction_context::s_must_replay: return "must_replay";
case trrep::transaction_context::s_replaying: return "replaying";
}
return "unknown";
}
}
#endif // TRREP_TRANSACTION_CONTEXT_HPP

View File

@ -2,10 +2,11 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "trrep/transaction_context.hpp"
#include "trrep/provider.hpp"
#include "mock_client_context.hpp"
#include "mock_server_context.hpp"
#include "provider.hpp"
#include "mock_utils.hpp"

View File

@ -1,90 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_VIEW_HPP
#define TRREP_VIEW_HPP
#include <wsrep_api.h>
#include <vector>
namespace trrep
{
class view
{
public:
class member
{
public:
member(const wsrep_member_info_t& member_info)
: id_()
, name_(member_info.name, WSREP_MEMBER_NAME_LEN)
, incoming_(member_info.incoming, WSREP_INCOMING_LEN)
{
char uuid_str[WSREP_UUID_STR_LEN + 1];
wsrep_uuid_print(&member_info.id, uuid_str, sizeof(uuid_str));
id_ = uuid_str;
}
const std::string& id() const { return id_; }
const std::string& name() const { return name_; }
const std::string& incoming() const { return incoming_; }
private:
std::string id_;
std::string name_;
std::string incoming_;
};
view(const wsrep_view_info_t& view_info)
: state_id_(view_info.state_id)
, view_(view_info.view)
, status_(view_info.status)
, capabilities_(view_info.capabilities)
, my_idx_(view_info.my_idx)
, proto_ver_(view_info.proto_ver)
, members_()
{
for (int i(0); i < view_info.memb_num; ++i)
{
members_.push_back(view_info.members[i]);
}
}
wsrep_seqno_t id() const
{ return view_; }
wsrep_view_status_t status() const
{ return status_; }
int own_index() const
{ return my_idx_; }
std::vector<member> members() const
{
std::vector<member> ret;
for (std::vector<wsrep_member_info_t>::const_iterator i(members_.begin());
i != members_.end(); ++i)
{
ret.push_back(member(*i));
}
return ret;
}
//
// Return true if the view is final
//
bool final() const
{
return (members_.empty() && my_idx_ == -1);
}
private:
wsrep_gtid_t state_id_;
wsrep_seqno_t view_;
wsrep_view_status_t status_;
wsrep_cap_t capabilities_;
int my_idx_;
int proto_ver_;
std::vector<wsrep_member_info_t> members_;
};
}
#endif // TRREP_VIEW

View File

@ -3,7 +3,7 @@
//
#include "wsrep_provider_v26.hpp"
#include "exception.hpp"
#include "trrep/exception.hpp"
#include <wsrep_api.h>

View File

@ -5,7 +5,7 @@
#ifndef TRREP_WSREP_PROVIDER_V26_HPP
#define TRREP_WSREP_PROVIDER_V26_HPP
#include "provider.hpp"
#include "trrep/provider.hpp"
#include <wsrep_api.h>