1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Imported initial implementation

This commit is contained in:
Teemu Ollakka
2018-04-16 14:37:58 +03:00
commit 3b428ff0b7
19 changed files with 1954 additions and 0 deletions

21
src/CMakeLists.txt Normal file
View File

@ -0,0 +1,21 @@
#
# Copyright (C) 2018 Codership Oy <info@codership.com>
#
add_library(trrep
provider.cpp
client_context.cpp
transaction_context.cpp)
add_executable(trrep_test
transaction_context_test.cpp
trrep_test.cpp
)
target_link_libraries(trrep_test trrep)
add_test(NAME trrep_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/trrep_test
)

16
src/client_context.cpp Normal file
View File

@ -0,0 +1,16 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "client_context.hpp"
#include "transaction_context.hpp"
// TODO: This should be pure virtual method, implemented by
// DBMS integration or mock classes only.
int trrep::client_context::replay(trrep::transaction_context& tc)
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
tc.state(lock, trrep::transaction_context::s_replaying);
tc.state(lock, trrep::transaction_context::s_committed);
return 0;
}

122
src/client_context.hpp Normal file
View File

@ -0,0 +1,122 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_CLIENT_CONTEXT_HPP
#define TRREP_CLIENT_CONTEXT_HPP
#include "provider.hpp"
#include "mutex.hpp"
#include "lock.hpp"
#include "data.hpp"
namespace trrep
{
class server_context;
class transaction_context;
enum client_error
{
e_success,
e_error_during_commit,
e_deadlock_error
};
class client_id
{
public:
template <typename I>
client_id(I id)
: id_(static_cast<wsrep_conn_id_t>(id))
{ }
wsrep_conn_id_t get() const { return id_; }
static wsrep_conn_id_t invalid() { return -1; }
private:
wsrep_conn_id_t id_;
};
class client_context
{
public:
enum mode
{
m_local,
m_applier
};
enum state
{
s_idle,
s_exec,
s_quitting
};
client_context(trrep::server_context& server_context,
client_id id,
enum mode mode)
: mutex_()
, server_context_(server_context)
, id_(id)
, mode_(mode)
, state_(s_idle)
{ }
virtual ~client_context() { }
// Accessors
trrep::mutex& mutex() { return mutex_; }
const trrep::server_context& server_context() const
{ return server_context_; }
client_id id() const { return id_; }
enum mode mode() const { return mode_; }
enum state state() const { return state_; }
//
//
//
virtual void before_command() { }
virtual void after_command() { }
virtual int before_statement() { return 0; }
virtual int after_statement() { return 0; }
virtual int rollback(trrep::transaction_context&) { return 0; }
virtual void will_replay(trrep::transaction_context&) { }
virtual int replay(trrep::transaction_context& tc);
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&)
{ }
virtual int prepare_data_for_replication(
const trrep::transaction_context&, trrep::data& data)
{
static const char buf[1] = { 1 };
data.assign(buf, 1);
return 0;
}
virtual void override_error(const trrep::client_error&) { }
virtual bool killed() const { return 0; }
virtual void abort() const { ::abort(); }
// Debug helpers
virtual void debug_sync(const std::string&)
{
}
virtual void debug_suicide(const std::string&)
{
abort();
}
private:
void state(enum state state);
trrep::mutex mutex_;
trrep::server_context& server_context_;
client_id id_;
enum mode mode_;
enum state state_;
};
}
#endif // TRREP_CLIENT_CONTEXT_HPP

36
src/data.hpp Normal file
View File

@ -0,0 +1,36 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_DATA_HPP
#define TRREP_DATA_HPP
namespace trrep
{
class data
{
public:
data()
: buf_()
{
assign(0, 0);
}
data(const void* ptr, size_t len)
: buf_()
{
assign(ptr, len);
}
void assign(const void* ptr, size_t len)
{
buf_.ptr = ptr;
buf_.len = len;
}
const wsrep_buf_t& get() const { return buf_; }
private:
wsrep_buf_t buf_;
};
}
#endif // TRREP_DATA_HPP

31
src/exception.hpp Normal file
View File

@ -0,0 +1,31 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_EXCEPTION_HPP
#define TRREP_EXCEPTION_HPP
#include <stdexcept>
namespace trrep
{
class runtime_error : public std::runtime_error
{
public:
runtime_error(const std::string& msg)
: std::runtime_error(msg)
{ }
};
class not_implemented_error : public std::exception
{
public:
not_implemented_error()
: std::exception()
{ }
};
}
#endif // TRREP_EXCEPTION_HPP

41
src/key.hpp Normal file
View File

@ -0,0 +1,41 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_KEY_HPP
#define TRREP_KEY_HPP
#include "exception.hpp"
namespace trrep
{
class key
{
public:
key()
: key_parts_()
, key_()
{
key_.key_parts = key_parts_;
key_.key_parts_num = 0;
}
void append_key_part(const void* ptr, size_t len)
{
if (key_.key_parts_num == 3)
{
throw trrep::runtime_error("key parts exceed maximum of 3");
}
key_parts_[key_.key_parts_num].ptr = ptr;
key_parts_[key_.key_parts_num].len = len;
++key_.key_parts_num;
}
const wsrep_key_t& get() const { return key_; }
private:
wsrep_buf_t key_parts_[3];
wsrep_key_t key_;
};
}
#endif // TRREP_KEY_HPP

57
src/lock.hpp Normal file
View File

@ -0,0 +1,57 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_LOCK_HPP
#define TRREP_LOCK_HPP
#include "mutex.hpp"
#include <cassert>
namespace trrep
{
template <class M>
class unique_lock
{
public:
unique_lock(M& mutex)
: mutex_(mutex)
, locked_(false)
{
mutex_.lock();
locked_ = true;
}
~unique_lock()
{
assert(locked_);
mutex_.unlock();
}
void lock()
{
mutex_.lock();
assert(locked_ == false);
locked_ = true;
}
void unlock()
{
assert(locked_);
locked_ = false;
mutex_.unlock();
}
bool owns_lock() const
{
return locked_;
}
private:
unique_lock(const unique_lock&);
unique_lock& operator=(const unique_lock&);
M& mutex_;
bool locked_;
};
}
#endif // TRREP_LOCK_HPP

135
src/mock_provider_impl.hpp Normal file
View File

@ -0,0 +1,135 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_MOCK_PROVIDER_IMPL_HPP
#define TRREP_MOCK_PROVIDER_IMPL_HPP
#include "provider_impl.hpp"
#include <cstring>
#include <map>
#include <iostream> // todo: proper logging
namespace trrep
{
class mock_provider_impl : public trrep::provider_impl
{
public:
typedef std::map<std::pair<wsrep_conn_id_t, wsrep_trx_id_t>, wsrep_seqno_t > bf_abort_map;
mock_provider_impl()
: group_id_()
, node_id_()
, group_seqno_(0)
, bf_abort_map_()
{
memset(&group_id_, 0, sizeof(group_id_));
group_id_.data[0] = 1;
memset(&node_id_, 0, sizeof(node_id_));
node_id_.data[0] = 1;
}
// Provider implemenatation interface
int start_transaction(wsrep_ws_handle_t*) { return 0; }
wsrep_status
certify_commit(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* ws_handle,
uint32_t flags,
wsrep_trx_meta_t* trx_meta)
{
assert(flags | WSREP_FLAG_TRX_END);
if ((flags | WSREP_FLAG_TRX_END) == 0)
{
return WSREP_FATAL;
}
std::cerr << "certify_commit: " << conn_id << ":"
<< ws_handle->trx_id << "\n";
trx_meta->stid.node = node_id_;
trx_meta->stid.trx = ws_handle->trx_id;
trx_meta->stid.conn = conn_id;
std::pair<wsrep_conn_id_t, wsrep_trx_id_t>
trx_id(conn_id, ws_handle->trx_id);
bf_abort_map::iterator it(bf_abort_map_.find(trx_id));
std::cerr << "bf aborted: "
<< (it == bf_abort_map_.end() ? "no" : "yes") << "\n";
if (it == bf_abort_map_.end())
{
++group_seqno_;
trx_meta->gtid.uuid = group_id_;
trx_meta->gtid.seqno = group_seqno_;
trx_meta->depends_on = group_seqno_ - 1;
std::cerr << "certify_commit return: " << WSREP_OK << "\n";
return WSREP_OK;
}
else
{
wsrep_status_t ret;
if (it->second == WSREP_SEQNO_UNDEFINED)
{
trx_meta->gtid.uuid = WSREP_UUID_UNDEFINED;
trx_meta->gtid.seqno = WSREP_SEQNO_UNDEFINED;
trx_meta->depends_on = WSREP_SEQNO_UNDEFINED;
ret = WSREP_TRX_FAIL;
}
else
{
++group_seqno_;
trx_meta->gtid.uuid = group_id_;
trx_meta->gtid.seqno = group_seqno_;
trx_meta->depends_on = group_seqno_ - 1;
ret = WSREP_BF_ABORT;
}
bf_abort_map_.erase(it);
std::cerr << "certify_commit return: " << ret << "\n";
return ret;
}
}
int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; }
int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; }
int rollback(const wsrep_trx_id_t) { return 0; }
wsrep_status_t commit_order_enter(wsrep_ws_handle_t*)
{ return WSREP_OK; }
int commit_order_leave(wsrep_ws_handle_t*) { return 0;}
int release(wsrep_ws_handle_t*) { return 0; }
// Methods to modify mock state
// Inject BF abort event into the provider.
//
// If bf_seqno equals to WSREP_SEQNO_UNDEFINED, the BF abort
// is done without ordering and the provider will return
// WSREP_TRX_FAIL to the aborted transaction. Otherwise
// WSREP_BF_ABORT is returned to the aborted transaction.
//
// @param conn_id Connection/client identifier to be aborted
// @param trx_id Trx id to be aborted
// @param bf_seqno Aborter sequence number
//
void bf_abort(wsrep_conn_id_t conn_id,
wsrep_trx_id_t trx_id,
wsrep_seqno_t bf_seqno)
{
std::cerr << "bf_abort: " << conn_id << ":" << trx_id << "\n";
assert(bf_seqno == WSREP_SEQNO_UNDEFINED ||
group_seqno_ < bf_seqno);
bf_abort_map_.insert(std::make_pair(std::make_pair(conn_id, trx_id),
bf_seqno));
if (bf_seqno != WSREP_SEQNO_UNDEFINED)
{
group_seqno_ = bf_seqno;
}
}
private:
wsrep_uuid_t group_id_;
wsrep_uuid_t node_id_;
wsrep_seqno_t group_seqno_;
bf_abort_map bf_abort_map_;
};
}
#endif // TRREP_MOCK_PROVIDER_IMPL_HPP

57
src/mutex.hpp Normal file
View File

@ -0,0 +1,57 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_MUTEX_HPP
#define TRREP_MUTEX_HPP
#include "exception.hpp"
#include <pthread.h>
namespace trrep
{
// Default pthread implementation
class mutex
{
public:
mutex()
: mutex_()
{
if (pthread_mutex_init(&mutex_, 0))
{
throw trrep::runtime_error("mutex init failed");
}
}
~mutex()
{
if (pthread_mutex_destroy(&mutex_))
{
throw trrep::runtime_error("mutex destroy failed");
}
}
void lock()
{
if (pthread_mutex_lock(&mutex_))
{
throw trrep::runtime_error("mutex lock failed");
}
}
void unlock()
{
if (pthread_mutex_unlock(&mutex_))
{
throw trrep::runtime_error("mutex unlock failed");
}
}
private:
mutex(const mutex& other);
mutex& operator=(const mutex& other);
pthread_mutex_t mutex_;
};
}
#endif // TRREP_MUTEX_HPP

13
src/provider.cpp Normal file
View File

@ -0,0 +1,13 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "provider.hpp"
#include "provider_impl.hpp"
trrep::provider* trrep::provider::make_provider(
const std::string&)
{
return 0;
}

52
src/provider.hpp Normal file
View File

@ -0,0 +1,52 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_PROVIDER_HPP
#define TRREP_PROVIDER_HPP
#include "provider_impl.hpp"
#include <wsrep_api.h>
#include <string>
namespace trrep
{
class provider
{
public:
// Construct a provider with give provider implementation
//
provider(provider_impl* impl) : impl_(impl) { }
int start_transaction(wsrep_ws_handle_t* wsh)
{ return impl_->start_transaction(wsh); }
int append_key(wsrep_ws_handle_t* wsh, const wsrep_key_t* key)
{ return impl_->append_key(wsh, key); }
int append_data(wsrep_ws_handle_t* wsh, const wsrep_buf_t* buf)
{ return impl_->append_data(wsh, buf); }
wsrep_status certify_commit(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* wsh,
uint32_t flags, wsrep_trx_meta_t* trx_meta)
{ return impl_->certify_commit(conn_id, wsh, flags, trx_meta); }
int rollback(const wsrep_trx_id_t trx_id)
{ return impl_->rollback(trx_id); }
wsrep_status commit_order_enter(wsrep_ws_handle_t* wsh)
{ return impl_->commit_order_enter(wsh); }
int commit_order_leave(wsrep_ws_handle_t* wsh)
{ return impl_->commit_order_leave(wsh); }
int release(wsrep_ws_handle_t* wsh)
{ return impl_->release(wsh); }
// Load the provider
// @param Path to provider library or "none" to load
// dummy provider
static provider* make_provider(const std::string& provider);
private:
provider_impl* impl_;
};
}
#endif // TRREP_PROVIDER_HPP

31
src/provider_impl.hpp Normal file
View File

@ -0,0 +1,31 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_PROVIDER_IMPL_HPP
#define TRREP_PROVIDER_IMPL_HPP
#include <wsrep_api.h>
namespace trrep
{
// Abstract interface for provider implementations
class provider_impl
{
public:
virtual int start_transaction(wsrep_ws_handle_t*) = 0;
virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0;
virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0;
virtual wsrep_status
certify_commit(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) = 0;
virtual int rollback(const wsrep_trx_id_t) = 0;
virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0;
virtual int commit_order_leave(wsrep_ws_handle_t*) = 0;
virtual int release(wsrep_ws_handle_t*) = 0;
};
}
#endif // TRREP_PROVIDER_IMPL_HPP

56
src/server_context.hpp Normal file
View File

@ -0,0 +1,56 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_SERVER_CONTEXT_HPP
#define TRREP_SERVER_CONTEXT_HPP
#include <string>
namespace trrep
{
// Forward declarations
class transaction_context;
class server_context
{
public:
enum rollback_mode
{
rm_async,
rm_sync
};
server_context(const std::string& name,
const std::string& id,
enum rollback_mode rollback_mode)
: name_(name)
, id_(id)
, rollback_mode_(rollback_mode)
{ }
const std::string& name() const { return name_; }
const std::string& id() const { return id_; }
virtual void on_connect() { }
virtual void on_view() { }
virtual void on_sync() { }
virtual void on_apply(trrep::transaction_context&) { }
virtual void on_commit(trrep::transaction_context&) { }
virtual bool statement_allowed_for_streaming(
const trrep::client_context&,
const trrep::transaction_context&) const
{
// Streaming not implemented yet
return false;
}
private:
std::string name_;
std::string id_;
enum rollback_mode rollback_mode_;
};
}
#endif // TRREP_SERVER_CONTEXT_HPP

517
src/transaction_context.cpp Normal file
View File

@ -0,0 +1,517 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "client_context.hpp"
#include "server_context.hpp"
#include "key.hpp"
#include "data.hpp"
#include <iostream> // TODO: replace with proper logging utility
#include <sstream>
// Public
int trrep::transaction_context::append_key(const trrep::key& key)
{
return provider_.append_key(&ws_handle_, &key.get());
}
int trrep::transaction_context::append_data(const trrep::data& data)
{
return provider_.append_data(&ws_handle_, &data.get());
}
int trrep::transaction_context::before_prepare()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_local);
assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort)
{
return 1;
}
if (is_streaming())
{
client_context_.debug_suicide(
"crash_last_fragment_commit_before_fragment_removal");
lock.unlock();
if (client_context_.server_context().statement_allowed_for_streaming(
client_context_, *this))
{
client_context_.override_error(trrep::e_error_during_commit);
ret = 1;
}
else
{
remove_fragments();
}
lock.lock();
client_context_.debug_suicide(
"crash_last_fragment_commit_after_fragment_removal");
}
assert(client_context_.mode() == trrep::client_context::m_local);
assert(state() == s_executing);
return ret;
}
int trrep::transaction_context::after_prepare()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_local);
assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort)
{
return 1;
}
if (state() == s_executing)
{
ret = certify_commit(lock);
assert((ret == 0 || state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
}
else
{
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
assert(client_context_.mode() == trrep::client_context::m_local);
return ret;
}
int trrep::transaction_context::before_commit()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
switch (client_context_.mode())
{
case trrep::client_context::m_local:
// Commit is one phase - before/after prepare was not called
if (state() == s_executing)
{
ret = certify_commit(lock);
assert((ret == 0 || state() == s_committing)
||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
}
else if (state() != s_committing)
{
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
else
{
// 2PC commit, prepare was done before
ret = 0;
}
if (ret == 0)
{
lock.unlock();
switch(provider_.commit_order_enter(&ws_handle_))
{
case WSREP_OK:
break;
case WSREP_BF_ABORT:
state(lock, s_must_abort);
ret = 1;
break;
default:
ret = 1;
assert(0);
break;
}
lock.lock();
}
break;
case trrep::client_context::m_applier:
ret = provider_.commit_order_enter(&ws_handle_);
if (ret)
{
state(lock, s_must_abort);
}
break;
}
return ret;
}
int trrep::transaction_context::ordered_commit()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_committing);
ret = provider_.commit_order_leave(&ws_handle_);
// Should always succeed
assert(ret == 0);
state(lock, s_ordered_commit);
return ret;
}
int trrep::transaction_context::after_commit()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_ordered_commit);
switch (client_context_.mode())
{
case trrep::client_context::m_local:
if (is_streaming())
{
clear_fragments();
}
ret = provider_.release(&ws_handle_);
break;
case trrep::client_context::m_applier:
break;
}
assert(ret == 0);
state(lock, s_committed);
return ret;
}
int trrep::transaction_context::before_rollback()
{
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_executing ||
state() == s_must_abort ||
state() == s_cert_failed ||
state() == s_must_replay);
switch (state())
{
case s_executing:
// Voluntary rollback
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
break;
case s_must_abort:
if (certified())
{
state(lock, s_must_replay);
}
else
{
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
}
break;
case s_cert_failed:
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
break;
case s_must_replay:
break;
default:
assert(0);
break;
}
return 0;
}
int trrep::transaction_context::after_rollback()
{
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_aborting ||
state() == s_must_replay);
if (state() == s_aborting)
{
state(lock, s_aborted);
}
// Releasing the transaction from provider is postponed into
// after_statement() hook. Depending on DBMS system all the
// resources acquired by transaction may or may not be released
// during actual rollback. If the transaction has been ordered,
// releasing the commit ordering critical section should be
// also postponed until all resources have been released.
return 0;
}
int trrep::transaction_context::after_statement()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_executing ||
state() == s_committed ||
state() == s_aborted ||
state() == s_must_abort ||
state() == s_must_replay);
switch (state())
{
case s_executing:
// ?
break;
case s_committed:
if (is_streaming())
{
state(lock, s_executing);
}
break;
case s_must_abort:
ret = client_context_.rollback(*this);
break;
case s_aborted:
break;
case s_must_replay:
ret = client_context_.replay(*this);
break;
default:
assert(0);
break;
}
assert(state() == s_executing ||
state() == s_committed ||
state() == s_aborted);
if (state() == s_aborted)
{
if (ordered())
{
ret = provider_.commit_order_enter(&ws_handle_);
if (ret == 0) provider_.commit_order_leave(&ws_handle_);
}
provider_.release(&ws_handle_);
}
if (state() != s_executing)
{
cleanup();
}
return ret;
}
// Private
void trrep::transaction_context::state(
trrep::unique_lock<trrep::mutex>& lock __attribute__((unused)),
enum trrep::transaction_context::state next_state)
{
assert(lock.owns_lock());
static const char allowed[n_states][n_states] =
{ /* ex ce co oc ct cf ma ab ad mr re from/to */
{ 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
{ 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
{ 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */
};
if (allowed[state_][next_state])
{
std::cerr << "state transition: " << trrep::to_string(state_)
<< " -> " << trrep::to_string(next_state)
<< "\n";
state_hist_.push_back(state_);
state_ = next_state;
}
else
{
std::ostringstream os;
os << "unallowed state transition for transaction "
<< id_.get() << ": " << trrep::to_string(state_)
<< " -> " << trrep::to_string(next_state);
std::cerr << os.str() << "\n";
throw trrep::runtime_error(os.str());
}
}
int trrep::transaction_context::certify_fragment()
{
throw trrep::not_implemented_error();
}
int trrep::transaction_context::certify_commit(
trrep::unique_lock<trrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(id_ != trrep::transaction_id::invalid());
client_context_.wait_for_replayers(lock);
assert(lock.owns_lock());
if (state() == s_must_abort)
{
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
state(lock, s_certifying);
lock.unlock();
trrep::data data;
if (client_context_.prepare_data_for_replication(*this, data))
{
// Note: Error must be set by prepare_data_for_replication()
lock.lock();
state(lock, s_must_abort);
return 1;
}
if (client_context_.killed())
{
lock.lock();
client_context_.override_error(trrep::e_deadlock_error);
state(lock, s_must_abort);
return 1;
}
wsrep_status cert_ret(provider_.certify_commit(client_context_.id().get(),
&ws_handle_,
flags() | WSREP_FLAG_TRX_END,
&trx_meta_));
lock.lock();
assert(state() == s_certifying || state() == s_must_abort);
client_context_.debug_sync("wsrep_after_replication");
int ret(1);
switch (cert_ret)
{
case WSREP_OK:
assert(ordered());
switch (state())
{
case s_certifying:
certified_ = true;
state(lock, s_committing);
ret = 0;
break;
case s_must_abort:
// We got BF aborted after succesful certification
// and before acquiring client context lock. This means that
// the trasaction must be replayed.
client_context_.will_replay(*this);
state(lock, s_must_replay);
break;
default:
assert(0);
break;
}
certified_ = true;
break;
case WSREP_WARNING:
assert(ordered() == false);
state(lock, s_must_abort);
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_TRX_MISSING:
state(lock, s_must_abort);
// The execution should never reach this point if the
// transaction has not generated any keys or data.
client_context_.override_error(trrep::e_error_during_commit);
assert(0);
break;
case WSREP_BF_ABORT:
// Transaction was replicated succesfully and it was either
// certified succesfully or the result of certifying is not
// yet known. Therefore the transaction must roll back
// and go through replay either to replay and commit the whole
// transaction or to determine failed certification status.
assert(ordered());
client_context_.will_replay(*this);
state(lock, s_must_abort);
state(lock, s_must_replay);
break;
case WSREP_TRX_FAIL:
state(lock, s_cert_failed);
client_context_.override_error(trrep::e_deadlock_error);
break;
case WSREP_SIZE_EXCEEDED:
state(lock, s_must_abort);
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_CONN_FAIL:
case WSREP_NODE_FAIL:
state(lock, s_must_abort);
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_FATAL:
client_context_.abort();
break;
case WSREP_NOT_IMPLEMENTED:
case WSREP_NOT_ALLOWED:
client_context_.override_error(trrep::e_error_during_commit);
state(lock, s_must_abort);
assert(0);
break;
default:
client_context_.override_error(trrep::e_error_during_commit);
break;
}
return ret;
}
void trrep::transaction_context::remove_fragments()
{
throw trrep::not_implemented_error();
}
void trrep::transaction_context::clear_fragments()
{
throw trrep::not_implemented_error();
}
void trrep::transaction_context::cleanup()
{
std::cerr << "Cleanup transaction "
<< client_context_.id().get()
<< ": " << id_.get() << "\n";
id_ = trrep::transaction_id::invalid();
state_ = s_executing;
state_hist_.clear();
trx_meta_.gtid = WSREP_GTID_UNDEFINED;
trx_meta_.stid.node = WSREP_UUID_UNDEFINED;
trx_meta_.stid.trx = trrep::transaction_id::invalid();
trx_meta_.stid.conn = trrep::client_id::invalid();
certified_ = false;
pa_unsafe_ = false;
}

188
src/transaction_context.hpp Normal file
View File

@ -0,0 +1,188 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_TRANSACTION_CONTEXT_HPP
#define TRREP_TRANSACTION_CONTEXT_HPP
#include "provider.hpp"
#include "lock.hpp"
#include <wsrep_api.h>
#include <cassert>
#include <vector>
namespace trrep
{
class client_context;
class key;
class data;
class transaction_id
{
public:
template <typename I>
transaction_id(I id)
: id_(static_cast<wsrep_trx_id_t>(id))
{ }
wsrep_trx_id_t get() const { return id_; }
static wsrep_trx_id_t invalid() { return wsrep_trx_id_t(-1); }
bool operator==(const transaction_id& other) const
{ return (id_ == other.id_); }
bool operator!=(const transaction_id& other) const
{ return (id_ != other.id_); }
private:
wsrep_trx_id_t id_;
};
class transaction_context
{
public:
enum state
{
s_executing,
s_certifying,
s_committing,
s_ordered_commit,
s_committed,
s_cert_failed,
s_must_abort,
s_aborting,
s_aborted,
s_must_replay,
s_replaying
};
static const int n_states = s_replaying + 1;
transaction_context(trrep::provider& provider,
trrep::client_context& client_context)
: provider_(provider)
, client_context_(client_context)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, ws_handle_()
, trx_meta_()
, pa_unsafe_(false)
, certified_(false)
{ }
#if 0
transaction_context(trrep::provider& provider,
trrep::client_context& client_context,
const transaction_id& id)
: provider_(provider)
, client_context_(client_context)
, id_(id)
, state_(s_executing)
, ws_handle_()
, gtid_()
{ }
#endif
// Accessors
trrep::transaction_id id() const
{ return id_; }
bool active() const
{ return (id_ != trrep::transaction_id::invalid()); }
enum state state() const
{ return state_; }
void state(trrep::unique_lock<trrep::mutex>&, enum state);
// Return true if the certification of the last
// fragment succeeded
bool certified() { return certified_; }
// Return true if the last fragment was ordered by the
// provider
bool ordered() { return (trx_meta_.gtid.seqno > 0); }
bool is_streaming() const
{
// Streaming support not yet implemented
return false;
}
bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
//
int start_transaction(const trrep::transaction_id& id)
{
assert(active() == false);
id_ = id;
ws_handle_.trx_id = id_.get();
return provider_.start_transaction(&ws_handle_);
}
int append_key(const trrep::key&);
int append_data(const trrep::data&);
int after_row();
int before_prepare();
int after_prepare();
int before_commit();
int ordered_commit();
int after_commit();
int before_rollback();
int after_rollback();
int before_statement();
int after_statement();
private:
uint32_t flags() const
{
uint32_t ret(0);
if (is_streaming() == false) ret |= WSREP_FLAG_TRX_START;
if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE;
return ret;
}
int certify_fragment();
int certify_commit(trrep::unique_lock<trrep::mutex>&);
void remove_fragments();
void clear_fragments();
void cleanup();
trrep::provider& provider_;
trrep::client_context& client_context_;
trrep::transaction_id id_;
enum state state_;
std::vector<enum state> state_hist_;
wsrep_ws_handle_t ws_handle_;
wsrep_trx_meta_t trx_meta_;
bool pa_unsafe_;
bool certified_;
};
static inline std::string to_string(enum trrep::transaction_context::state state)
{
switch (state)
{
case trrep::transaction_context::s_executing: return "executing";
case trrep::transaction_context::s_certifying: return "certifying";
case trrep::transaction_context::s_committing: return "committing";
case trrep::transaction_context::s_ordered_commit: return "ordered_commit";
case trrep::transaction_context::s_committed: return "committed";
case trrep::transaction_context::s_cert_failed: return "cert_failed";
case trrep::transaction_context::s_must_abort: return "must_abort";
case trrep::transaction_context::s_aborting: return "aborting";
case trrep::transaction_context::s_aborted: return "aborted";
case trrep::transaction_context::s_must_replay: return "must_replay";
case trrep::transaction_context::s_replaying: return "replaying";
default: return "unknown";
}
}
}
#endif // TRREP_TRANSACTION_CONTEXT_HPP

View File

@ -0,0 +1,349 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "client_context.hpp"
#include "server_context.hpp"
#include "provider.hpp"
#include "mock_provider_impl.hpp"
#include <boost/test/unit_test.hpp>
//
// Utility functions
//
namespace
{
// Simple BF abort method to BF abort unordered transasctions
void bf_abort_unordered(trrep::client_context& cc,
trrep::transaction_context& tc)
{
trrep::unique_lock<trrep::mutex> lock(cc.mutex());
tc.state(lock, trrep::transaction_context::s_must_abort);
}
// BF abort method to abort transactions via provider
void bf_abort_provider(trrep::mock_provider_impl& provider,
const trrep::client_context& cc,
const trrep::transaction_context& tc,
wsrep_seqno_t seqno)
{
provider.bf_abort(cc.id().get(), tc.id().get(), seqno);
}
}
//
// Test a succesful 1PC transaction lifecycle
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Run before commit
BOOST_REQUIRE(tc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing);
// Run ordered commit
BOOST_REQUIRE(tc.ordered_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit);
// Run after commit
BOOST_REQUIRE(tc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a succesful 2PC transaction lifecycle
//
BOOST_AUTO_TEST_CASE(transaction_context_2pc)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Run before prepare
BOOST_REQUIRE(tc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Run after prepare
BOOST_REQUIRE(tc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing);
// Run before commit
BOOST_REQUIRE(tc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing);
// Run ordered commit
BOOST_REQUIRE(tc.ordered_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit);
// Run after commit
BOOST_REQUIRE(tc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a 1PC transaction which gets BF aborted before before_commit
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_unordered(cc, tc);
// Run before commit
BOOST_REQUIRE(tc.before_commit());
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == false);
// Rollback sequence
BOOST_REQUIRE(tc.before_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting);
BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a 2PC transaction which gets BF aborted before before_prepare
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_unordered(cc, tc);
// Run before commit
BOOST_REQUIRE(tc.before_prepare());
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == false);
// Rollback sequence
BOOST_REQUIRE(tc.before_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting);
BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a 2PC transaction which gets BF aborted before before_prepare
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Run before prepare
BOOST_REQUIRE(tc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_unordered(cc, tc);
// Run before commit
BOOST_REQUIRE(tc.after_prepare());
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == false);
// Rollback sequence
BOOST_REQUIRE(tc.before_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting);
BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a 1PC transaction which gets BF aborted during before_commit via
// provider before the write set was ordered and certified.
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_provider(mock_provider, cc, tc, WSREP_SEQNO_UNDEFINED);
// Run before commit
BOOST_REQUIRE(tc.before_commit());
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_cert_failed);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == false);
// Rollback sequence
BOOST_REQUIRE(tc.before_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting);
BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
//
// Test a 1PC transaction which gets BF aborted during before_commit via
// provider after the write set was ordered and certified.
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::transaction_context tc(provider, cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
// Start a new transaction with ID 1
tc.start_transaction(1);
BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_provider(mock_provider, cc, tc, 1);
// Run before commit
BOOST_REQUIRE(tc.before_commit());
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay);
BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == true);
// Rollback sequence
BOOST_REQUIRE(tc.before_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay);
BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay);
// Cleanup after statement
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}

7
src/trrep_test.cpp Normal file
View File

@ -0,0 +1,7 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#define BOOST_TEST_MODULE trrep_test
#include <boost/test/included/unit_test.hpp>