1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Provider type abstraction, partially completed.

This commit is contained in:
Teemu Ollakka
2018-06-09 23:28:02 +03:00
parent 2cecb3defe
commit e74b214c9c
15 changed files with 646 additions and 263 deletions

31
include/wsrep/buffer.hpp Normal file
View File

@ -0,0 +1,31 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_BUFFER_HPP
#define WSREP_BUFFER_HPP
namespace wsrep
{
class buffer
{
public:
buffer()
: ptr_()
, size_()
{ }
buffer(const void* ptr, size_t size)
: ptr_(ptr)
, size_(size)
{ }
const void* ptr() const { return ptr_; }
size_t size() const { return size_; }
private:
const void* ptr_;
size_t size_;
};
}
#endif // WSREP_BUFFER_HPP

View File

@ -40,6 +40,7 @@
#include "server_context.hpp"
#include "transaction_context.hpp"
#include "client_id.hpp"
#include "mutex.hpp"
#include "lock.hpp"
#include "data.hpp"
@ -68,18 +69,6 @@ namespace wsrep
}
return "unknown";
}
class client_id
{
public:
template <typename I>
client_id(I id)
: id_(static_cast<wsrep_conn_id_t>(id))
{ }
wsrep_conn_id_t get() const { return id_; }
static wsrep_conn_id_t invalid() { return -1; }
private:
wsrep_conn_id_t id_;
};
/*! \class Client Context
*
@ -227,12 +216,11 @@ namespace wsrep
return transaction_.start_transaction(id);
}
int start_transaction(const wsrep_ws_handle_t& wsh,
const wsrep_trx_meta_t& meta,
uint32_t flags)
int start_transaction(const wsrep::ws_handle& wsh,
const wsrep::ws_meta& meta)
{
assert(mode_ == m_applier);
return transaction_.start_transaction(wsh, meta, flags);
return transaction_.start_transaction(wsh, meta);
}
int append_key(const wsrep::key& key)
@ -287,7 +275,7 @@ namespace wsrep
}
int bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep_seqno_t bf_seqno)
wsrep::seqno bf_seqno)
{
return transaction_.bf_abort(lock, bf_seqno);
}
@ -415,7 +403,7 @@ namespace wsrep
* Append SR fragment to the transaction.
*/
virtual int append_fragment(wsrep::transaction_context&,
uint32_t, const wsrep::data&)
int, const wsrep::data&)
{ return 0; }
@ -466,7 +454,7 @@ namespace wsrep
const wsrep::transaction_context&, wsrep::data& data)
{
static const char buf[1] = { 1 };
data.assign(buf, 1);
data = wsrep::data(buf, 1);
return 0;
}

View File

@ -0,0 +1,28 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_CLIENT_ID_HPP
#define WSREP_CLIENT_ID_HPP
namespace wsrep
{
class client_id
{
public:
typedef unsigned long long type;
client_id()
: id_(-1)
{ }
template <typename I>
client_id(I id)
: id_(static_cast<type>(id))
{ }
type get() const { return id_; }
static type invalid() { return -1; }
private:
type id_;
};
}
#endif // WSREP_CLIENT_ID_HPP

View File

@ -13,23 +13,14 @@ namespace wsrep
data()
: buf_()
{
assign(0, 0);
}
data(const void* ptr, size_t len)
: buf_()
: buf_(ptr, len)
{
assign(ptr, len);
}
void assign(const void* ptr, size_t len)
{
buf_.ptr = ptr;
buf_.len = len;
}
const wsrep_buf_t& get() const { return buf_; }
const wsrep::buffer& get() const { return buf_; }
private:
wsrep_buf_t buf_;
wsrep::buffer buf_;
};
}

View File

@ -6,35 +6,42 @@
#define WSREP_KEY_HPP
#include "exception.hpp"
#include "buffer.hpp"
namespace wsrep
{
class key
{
public:
key()
: key_parts_()
, key_()
enum type
{
key_.key_parts = key_parts_;
key_.key_parts_num = 0;
}
shared,
semi_shared,
semi_exclusive,
exclusive
};
key(enum type type)
: type_(type)
, key_parts_()
, key_parts_len_()
{ }
void append_key_part(const void* ptr, size_t len)
{
if (key_.key_parts_num == 3)
if (key_parts_len_ == 3)
{
throw wsrep::runtime_error("key parts exceed maximum of 3");
}
key_parts_[key_.key_parts_num].ptr = ptr;
key_parts_[key_.key_parts_num].len = len;
++key_.key_parts_num;
key_parts_[key_parts_len_] = wsrep::buffer(ptr, len);
++key_parts_len_;
}
const wsrep_key_t& get() const { return key_; }
private:
wsrep_buf_t key_parts_[3];
wsrep_key_t key_;
enum type type_;
wsrep::buffer key_parts_[3];
size_t key_parts_len_;
};
}

View File

@ -5,15 +5,228 @@
#ifndef WSREP_PROVIDER_HPP
#define WSREP_PROVIDER_HPP
// #include "provider_impl.hpp"
#include "key.hpp"
#include "data.hpp"
#include "client_id.hpp"
#include "transaction_id.hpp"
#include <wsrep_api.h>
#include <cassert>
#include <cstring>
#include <string>
#include <vector>
#include <ostream>
namespace wsrep
{
/*! \class seqno
*
* Sequence number type.
*
* By convention, nil value is zero, negative values are not allowed.
* Relation operators are restricted to < and > on purpose to
* enforce correct use.
*/
class seqno
{
public:
seqno()
: seqno_()
{ }
template <typename I>
seqno(I seqno)
: seqno_(seqno)
{
assert(seqno_ >= 0);
}
long long get() const
{
return seqno_;
}
bool nil() const
{
return (seqno_ == 0);
}
bool operator<(seqno other) const
{
return (seqno_ < other.seqno_);
}
bool operator>(seqno other) const
{
return (seqno_ > other.seqno_);
}
seqno operator+(seqno other) const
{
return (seqno_ + other.seqno_);
}
static seqno undefined() { return 0; }
private:
long long seqno_;
};
static inline std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno)
{
return (os << seqno.get());
}
class id
{
public:
enum type
{
none,
string,
uuid
};
id()
: type_(none)
, data_()
{ }
id(const std::string&);
id (const void* data, size_t data_size)
: type_()
, data_()
{
assert(data_size <= 16);
std::memcpy(data_, data, data_size);
}
#if 0
void assign(const void* data, size_t data_size)
{
assert(data_size == 16);
memcpy(data_, data, data_size);
}
#endif
bool operator<(const id& other) const
{
return (std::memcmp(data_, other.data_, sizeof(data_)) < 0);
}
bool operator==(const id& other) const
{
return (std::memcmp(data_, other.data_, sizeof(data_)) == 0);
}
private:
enum type type_;
unsigned char data_[16];
};
class gtid
{
public:
gtid()
: id_()
, seqno_()
{ }
gtid(const wsrep::id& id, wsrep::seqno seqno)
: id_(id)
, seqno_(seqno)
{ }
const wsrep::id& id() const { return id_; }
wsrep::seqno seqno() const { return seqno_ ; }
private:
wsrep::id id_;
wsrep::seqno seqno_;
};
class stid
{
public:
stid()
: server_id_()
, transaction_id_()
, client_id_()
{ }
stid(const wsrep::id& server_id,
wsrep::transaction_id transaction_id,
wsrep::client_id client_id)
: server_id_(server_id)
, transaction_id_(transaction_id)
, client_id_(client_id)
{ }
const wsrep::id& server_id() const { return server_id_; }
wsrep::transaction_id transaction_id() const
{ return transaction_id_; }
wsrep::client_id client_id() const { return client_id_; }
private:
wsrep::id server_id_;
wsrep::transaction_id transaction_id_;
wsrep::client_id client_id_;
};
class ws_handle
{
public:
ws_handle()
: transaction_id_()
, opaque_()
{ }
ws_handle(wsrep::transaction_id id)
: transaction_id_(id)
, opaque_()
{ }
ws_handle(wsrep::transaction_id id,
void* opaque)
: transaction_id_(id)
, opaque_(opaque)
{ }
wsrep::transaction_id transaction_id() const
{ return transaction_id_; }
void* opaque() const { return opaque_; }
private:
wsrep::transaction_id transaction_id_;
void* opaque_;
};
class ws_meta
{
public:
ws_meta()
: gtid_()
, stid_()
, depends_on_()
, flags_()
{ }
ws_meta(const wsrep::gtid& gtid,
const wsrep::stid& stid,
wsrep::seqno depends_on,
int flags)
: gtid_(gtid)
, stid_(stid)
, depends_on_(depends_on)
, flags_(flags)
{ }
wsrep::transaction_id transaction_id() const
{
return stid_.transaction_id();
}
wsrep::seqno seqno() const
{
return gtid_.seqno();
}
private:
wsrep::gtid gtid_;
wsrep::stid stid_;
wsrep::seqno depends_on_;
int flags_;
};
// Abstract interface for provider implementations
class provider
{
@ -33,6 +246,50 @@ namespace wsrep
std::string name_;
std::string value_;
};
/*!
* Return value enumeration
*/
enum status
{
/*! Success */
success,
/*! Warning*/
error_warning,
/*! Transaction was not found from provider */
error_transaction_missing,
/*! Certification failed */
error_certification_failed,
/*! Transaction was BF aborted */
error_bf_abort,
/*! Transaction size exceeded */
error_size_exceeded,
/*! Connectivity to cluster lost */
error_connection_failed,
/*! Internal provider failure, provider must be reinitialized */
error_provider_failed,
/*! Fatal error, server must abort */
error_fatal,
/*! Requested functionality is not implemented by the provider */
error_not_implemented,
/*! Operation is not allowed */
error_not_allowed,
/*! Unknown error code from the provider */
error_unknown
};
struct flag
{
static const int start_transaction = (1 << 0);
static const int commit = (1 << 1);
static const int rollback = (1 << 2);
static const int isolation = (1 << 3);
static const int pa_unsafe = (1 << 4);
static const int commutative = (1 << 5);
static const int native = (1 << 6);
static const int snapshot = (1 << 7);
};
virtual ~provider() { }
// Provider state management
virtual int connect(const std::string& cluster_name,
@ -43,16 +300,16 @@ namespace wsrep
// Applier interface
virtual wsrep_status_t run_applier(void* applier_ctx) = 0;
virtual enum status run_applier(void* applier_ctx) = 0;
// Write set replication
// TODO: Rename to assing_read_view()
virtual int start_transaction(wsrep_ws_handle_t*) = 0;
virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0;
virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0;
virtual wsrep_status_t
certify(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) = 0;
virtual int start_transaction(wsrep::ws_handle&) = 0;
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
virtual int append_data(wsrep::ws_handle&, const wsrep::data&) = 0;
virtual enum status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) = 0;
/*!
* BF abort a transaction inside provider.
*
@ -63,25 +320,25 @@ namespace wsrep
*
* @return wsrep_status_t
*/
virtual wsrep_status_t bf_abort(wsrep_seqno_t bf_seqno,
wsrep_trx_id_t victim_trx,
wsrep_seqno_t* victim_seqno) = 0;
virtual int rollback(const wsrep_trx_id_t) = 0;
virtual wsrep_status commit_order_enter(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) = 0;
virtual int commit_order_leave(const wsrep_ws_handle_t*,
const wsrep_trx_meta_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0;
virtual enum status bf_abort(wsrep::seqno bf_seqno,
wsrep::transaction_id victim_trx,
wsrep::seqno& victim_seqno) = 0;
virtual int rollback(wsrep::transaction_id) = 0;
virtual enum status commit_order_enter(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual int commit_order_leave(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual int release(wsrep::ws_handle&) = 0;
/*!
* Replay a transaction.
*
* @return Zero in case of success, non-zero on failure.
*/
virtual int replay(wsrep_ws_handle_t* ws_handle, void* applier_ctx) = 0;
virtual int replay(wsrep::ws_handle& ws_handle, void* applier_ctx) = 0;
virtual int sst_sent(const wsrep_gtid_t&, int) = 0;
virtual int sst_received(const wsrep_gtid_t&, int) = 0;
virtual int sst_sent(const wsrep::gtid&, int) = 0;
virtual int sst_received(const wsrep::gtid&, int) = 0;
virtual std::vector<status_variable> status() const = 0;

View File

@ -64,7 +64,7 @@
#include "exception.hpp"
#include "mutex.hpp"
#include "condition_variable.hpp"
#include "wsrep_api.h"
#include "provider.hpp"
#include <string>
#include <vector>
@ -72,7 +72,7 @@
namespace wsrep
{
// Forward declarations
class provider;
// class provider;
class client_context;
class transaction_context;
class view;
@ -292,14 +292,14 @@ namespace wsrep
* \param bypass Boolean bypass flag.
*/
virtual void on_sst_request(const std::string& sst_request,
const wsrep_gtid_t& gtid,
const wsrep::gtid& gtid,
bool bypass) = 0;
/*!
*
*/
void sst_sent(const wsrep_gtid_t& gtid, int error);
void sst_sent(const wsrep::gtid& gtid, int error);
/*!
* This method must be called by the joiner after the SST
@ -307,7 +307,7 @@ namespace wsrep
*
* \param gtid GTID provided by the SST transfer
*/
void sst_received(const wsrep_gtid_t& gtid, int error);
void sst_received(const wsrep::gtid& gtid, int error);
/*!
* This method must be called after the server initialization

View File

@ -7,10 +7,9 @@
#include "provider.hpp"
#include "server_context.hpp"
#include "transaction_id.hpp"
#include "lock.hpp"
#include <wsrep_api.h>
#include <cassert>
#include <vector>
@ -20,22 +19,6 @@ namespace wsrep
class key;
class data;
class transaction_id
{
public:
template <typename I>
transaction_id(I id)
: id_(static_cast<wsrep_trx_id_t>(id))
{ }
wsrep_trx_id_t get() const { return id_; }
static wsrep_trx_id_t invalid() { return wsrep_trx_id_t(-1); }
bool operator==(const transaction_id& other) const
{ return (id_ == other.id_); }
bool operator!=(const transaction_id& other) const
{ return (id_ != other.id_); }
private:
wsrep_trx_id_t id_;
};
class transaction_context
{
@ -75,13 +58,14 @@ namespace wsrep
// fragment succeeded
bool certified() const { return certified_; }
wsrep_seqno_t seqno() const
wsrep::seqno seqno() const
{
return trx_meta_.gtid.seqno;
return ws_meta_.seqno();
}
// Return true if the last fragment was ordered by the
// provider
bool ordered() const { return (trx_meta_.gtid.seqno > 0); }
bool ordered() const
{ return (ws_meta_.seqno().nil() == false); }
bool is_streaming() const
{
@ -91,19 +75,18 @@ namespace wsrep
bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
//
int start_transaction()
{
assert(active() == false);
assert(trx_meta_.stid.trx != transaction_id::invalid());
return start_transaction(trx_meta_.stid.trx);
assert(ws_meta_.transaction_id() != transaction_id::invalid());
return start_transaction(ws_meta_.transaction_id());
}
int start_transaction(const wsrep::transaction_id& id);
int start_transaction(const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags);
int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta);
int append_key(const wsrep::key&);
@ -130,21 +113,21 @@ namespace wsrep
int after_statement();
bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep_seqno_t bf_seqno);
wsrep::seqno bf_seqno);
uint32_t flags() const
int flags() const
{
return flags_;
}
wsrep::mutex& mutex();
wsrep_ws_handle_t& ws_handle() { return ws_handle_; }
// wsrep_ws_handle_t& ws_handle() { return ws_handle_; }
private:
transaction_context(const transaction_context&);
transaction_context operator=(const transaction_context&);
void flags(uint32_t flags) { flags_ = flags; }
void flags(int flags) { flags_ = flags; }
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
void remove_fragments();
@ -159,13 +142,13 @@ namespace wsrep
std::vector<enum state> state_hist_;
enum state bf_abort_state_;
int bf_abort_client_state_;
wsrep_ws_handle_t ws_handle_;
wsrep_trx_meta_t trx_meta_;
uint32_t flags_;
wsrep::ws_handle ws_handle_;
wsrep::ws_meta ws_meta_;
int flags_;
bool pa_unsafe_;
bool certified_;
std::vector<wsrep_gtid_t> fragments_;
std::vector<wsrep::seqno> fragments_;
wsrep::transaction_id rollback_replicated_for_;
};

View File

@ -0,0 +1,46 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_TRANSACTION_ID_HPP
#define WSREP_TRANSACTION_ID_HPP
#include <ostream>
namespace wsrep
{
class transaction_id
{
public:
typedef unsigned long long type;
transaction_id()
: id_(-1)
{ }
template <typename I>
transaction_id(I id)
: id_(static_cast<type>(id))
{ }
type get() const { return id_; }
static unsigned long long invalid() { return type(-1); }
bool operator<(const transaction_id& other) const
{
return (id_ < other.id_);
}
bool operator==(const transaction_id& other) const
{ return (id_ == other.id_); }
bool operator!=(const transaction_id& other) const
{ return (id_ != other.id_); }
private:
type id_;
};
static inline std::ostream& operator<<(std::ostream& os, transaction_id id)
{
return (os << id.get());
}
}
#endif // WSREP_TRANSACTION_ID_HPP