From 3b428ff0b7d36d016e05fb99efc439c1f38e345d Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Mon, 16 Apr 2018 14:37:58 +0300 Subject: [PATCH] Imported initial implementation --- CMakeLists.txt | 43 +++ README.md | 182 +++++++++++ src/CMakeLists.txt | 21 ++ src/client_context.cpp | 16 + src/client_context.hpp | 122 ++++++++ src/data.hpp | 36 +++ src/exception.hpp | 31 ++ src/key.hpp | 41 +++ src/lock.hpp | 57 ++++ src/mock_provider_impl.hpp | 135 ++++++++ src/mutex.hpp | 57 ++++ src/provider.cpp | 13 + src/provider.hpp | 52 ++++ src/provider_impl.hpp | 31 ++ src/server_context.hpp | 56 ++++ src/transaction_context.cpp | 517 +++++++++++++++++++++++++++++++ src/transaction_context.hpp | 188 +++++++++++ src/transaction_context_test.cpp | 349 +++++++++++++++++++++ src/trrep_test.cpp | 7 + 19 files changed, 1954 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 README.md create mode 100644 src/CMakeLists.txt create mode 100644 src/client_context.cpp create mode 100644 src/client_context.hpp create mode 100644 src/data.hpp create mode 100644 src/exception.hpp create mode 100644 src/key.hpp create mode 100644 src/lock.hpp create mode 100644 src/mock_provider_impl.hpp create mode 100644 src/mutex.hpp create mode 100644 src/provider.cpp create mode 100644 src/provider.hpp create mode 100644 src/provider_impl.hpp create mode 100644 src/server_context.hpp create mode 100644 src/transaction_context.cpp create mode 100644 src/transaction_context.hpp create mode 100644 src/transaction_context_test.cpp create mode 100644 src/trrep_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..7f23664 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,43 @@ +# +# Copyright (C) 2018 Codership Oy +# + +cmake_minimum_required (VERSION 2.8) +project (wsrep-cpp) +include(CheckIncludeFile) +include(CTest) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Werror -Weffc++ -Woverloaded-virtual -Wno-non-virtual-dtor -g") + +if (NOT WITH_WSREP_API) + message(ERROR "Wsrep API location must be provided") +else() + check_include_file("${WITH_WSREP_API}/wsrep_api.h" HAVE_WSREP_API_HPP) + include_directories("${WITH_WSREP_API}") +endif() + +find_package(Boost 1.54.0 REQUIRED + unit_test_framework + ) + +# Coverage +# To produce a coverage report, call cmake with -DWITH_COVERAGE=ON, +# run +# +# make +# make test +# make ExperimentalCoverage +# make coverage_report +# +# The coverage report output will be in directory root index.html +# +if (WITH_COVERAGE) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-coverage") +endif() +add_custom_target(coverage_report + lcov --capture --directory . --output lcov.info --no-external + COMMAND genhtml --output-directory coverage_report lcov.info) + + +add_subdirectory(src) diff --git a/README.md b/README.md new file mode 100644 index 0000000..40715d5 --- /dev/null +++ b/README.md @@ -0,0 +1,182 @@ +# Introduction + +Project name: Trrep - Transaction Replication + +The purpose of this project is to implement C++ wrapper +for wsrep API with additional convenience for transaction +processing. + +Integration of wsrep API to mysql-wsrep has shown that even +though wsrep API is supposed to provide self contained API +for transaction replication, also DBMS side integration requires +lots of inter thread coordination and state book keeping due to BF +aborts caused by high priority transactions. Also transaction states +are not exposed by the wsrep API, which essentially requires the +DBMS integration to duplicate the state tracking independently of +the wsrep provider. + +This project will abstract away most of the transaction state +keeping required on DBMS side and will provide simple +C++ interface for key set population, data set population, +(2PC) commit processing, write set applying etc. + +This project will also provide unit testing capability for +DBMS integration and wsrep provider implementation. + + +The rest of the document dscribes the proposed API in more detail +and should be replaced with automatically generated documentation +later on (doxygen). + +# High Level Design + +## Provider + +Context for wsrep provider which will encapsulate provider loading, +configuration, registering callbacks. + +The DBMS implementation will provide an instance of provider +observer class which is called appropriately when the wsrep provider +calls the registered callbacks. + + + +## Server Context + +Server attributes: +* Identifiers (name, uuid) + +Server capabilities: +* Rollback mode (async or sync) +* + +Abstract Methods: +* on_connect() +* on_view() +* on_sync() +* on_apply() +* on_commit() + + +### Rollback mode + +High priority transactions may induce BF aborts due to lock conflicts +during HPT applying. The locally running transaction must be brute force +aborted (BF abort) if the local transaction holds a lock which +HPT is trying to lock and the local transaction is either not ordered +or is ordered after HPT. If HTP can just steal the particular lock +and mark the LT as BF abort victim, the rollback is said to be asynchronous; +client thread will process the rollback once it gains the control again. +On the other hand, if the rollback mode is synchronous, the HPT +must wait until the LT has been rolled back and locks have been +released before it can continue applying. In such a case a background +rollbacker thread may be used to run the rollback process if the +client thread is currently idle, i.e. the control is in the client +program which drives the transaction. + +## Client Context + +Attributes +* Identifier +* Mode (local, applier) +* State (idle, exec, quit) + +Abstract methods: +* before_command() - executed before a command from client is processed +* after_command() - executed after a command from client has been processed +* before_statement() - executed before a statement from client is processed +* after_statement - executed after a statement from a client has been processed + +The following subsections briefly describe what would be the +purpose of the abstract methods in mysql-wsrep implementattion. +In other DBMS systems the purpose may vary. + +### Before_command() + +If the server supports only synchronous rollback, this method must make +sure that any rollback rollback processing is finished before the control +is given back to the client thread. + +### After_command() + +Bookkeeping. + +### Before_statement() + +Checks which are run before the actual parsed statement is executed. +These may include: +* Check if dirty reads are allowed +* ... + +### After_statement() + +Checks if the statement can and should be retried. + +## Transaction Context + +Attributes +* Client context - pointer or reference to client context +* Identifier - an identifier which uniquely identifies the transaction + on the server the client driven execution happens + +## Local Transaction Replication + +See https://github.com/codership/Engineering/wiki/Transaction_coordinator_design + +Attributes: +* Identifier (interger, uuid or similar) +* State (executing, certifying, must_abort, etc) + +Methods: +* append_key() +* append_data() +* after_row() +* before_prepare() +* after_prepare() + +### Append Key + +Appends a certification key into transaction key set buffer. + +### Append Data + +Append a write set data fragment into transaction write set +buffer. + +### After Row + +Streaming replication specific. + +### Before_prepare() + +This method is executed before the prepare phase is run for DBMS +engines. + +### After_prepare() + +This method is executed after the prepare phase is run for DBMS engines. +The populated write set is replicated and certified at this stage. +If the streaming replication is enabled, the final write set may +only contain an empty commit fragment which triggers commit +on remote servers. + +## Remote Transaction Applying + +Methods: +* apply() + +## Commit Order Coordinator + +Commit order coordinator is a common for both local and remote +transaction. Currently this is implemented in transaction_context +class, check out later if it make sense to refactor commit ordering +into separate class. + +Methods: + +* before_commit() +* ordered_commit() +* after_commit() + + + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..be779f5 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Copyright (C) 2018 Codership Oy +# + + + +add_library(trrep + provider.cpp + client_context.cpp + transaction_context.cpp) + +add_executable(trrep_test + transaction_context_test.cpp + trrep_test.cpp + ) +target_link_libraries(trrep_test trrep) +add_test(NAME trrep_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/trrep_test + ) + diff --git a/src/client_context.cpp b/src/client_context.cpp new file mode 100644 index 0000000..3399472 --- /dev/null +++ b/src/client_context.cpp @@ -0,0 +1,16 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "client_context.hpp" +#include "transaction_context.hpp" + +// TODO: This should be pure virtual method, implemented by +// DBMS integration or mock classes only. +int trrep::client_context::replay(trrep::transaction_context& tc) +{ + trrep::unique_lock lock(mutex_); + tc.state(lock, trrep::transaction_context::s_replaying); + tc.state(lock, trrep::transaction_context::s_committed); + return 0; +} diff --git a/src/client_context.hpp b/src/client_context.hpp new file mode 100644 index 0000000..80f38d7 --- /dev/null +++ b/src/client_context.hpp @@ -0,0 +1,122 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_CLIENT_CONTEXT_HPP +#define TRREP_CLIENT_CONTEXT_HPP + +#include "provider.hpp" +#include "mutex.hpp" +#include "lock.hpp" +#include "data.hpp" + +namespace trrep +{ + class server_context; + class transaction_context; + + enum client_error + { + e_success, + e_error_during_commit, + e_deadlock_error + }; + + class client_id + { + public: + template + client_id(I id) + : id_(static_cast(id)) + { } + wsrep_conn_id_t get() const { return id_; } + static wsrep_conn_id_t invalid() { return -1; } + private: + wsrep_conn_id_t id_; + }; + + class client_context + { + public: + enum mode + { + m_local, + m_applier + }; + + enum state + { + s_idle, + s_exec, + s_quitting + }; + + client_context(trrep::server_context& server_context, + client_id id, + enum mode mode) + : mutex_() + , server_context_(server_context) + , id_(id) + , mode_(mode) + , state_(s_idle) + { } + + virtual ~client_context() { } + // Accessors + trrep::mutex& mutex() { return mutex_; } + const trrep::server_context& server_context() const + { return server_context_; } + client_id id() const { return id_; } + enum mode mode() const { return mode_; } + enum state state() const { return state_; } + + // + // + // + virtual void before_command() { } + + virtual void after_command() { } + + virtual int before_statement() { return 0; } + + virtual int after_statement() { return 0; } + + virtual int rollback(trrep::transaction_context&) { return 0; } + + virtual void will_replay(trrep::transaction_context&) { } + virtual int replay(trrep::transaction_context& tc); + + virtual void wait_for_replayers(trrep::unique_lock&) + { } + virtual int prepare_data_for_replication( + const trrep::transaction_context&, trrep::data& data) + { + static const char buf[1] = { 1 }; + data.assign(buf, 1); + return 0; + } + virtual void override_error(const trrep::client_error&) { } + virtual bool killed() const { return 0; } + virtual void abort() const { ::abort(); } + // Debug helpers + virtual void debug_sync(const std::string&) + { + + } + virtual void debug_suicide(const std::string&) + { + abort(); + } + private: + + void state(enum state state); + + trrep::mutex mutex_; + trrep::server_context& server_context_; + client_id id_; + enum mode mode_; + enum state state_; + }; +} + +#endif // TRREP_CLIENT_CONTEXT_HPP diff --git a/src/data.hpp b/src/data.hpp new file mode 100644 index 0000000..03445a6 --- /dev/null +++ b/src/data.hpp @@ -0,0 +1,36 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_DATA_HPP +#define TRREP_DATA_HPP + +namespace trrep +{ + class data + { + public: + data() + : buf_() + { + assign(0, 0); + } + data(const void* ptr, size_t len) + : buf_() + { + assign(ptr, len); + } + + void assign(const void* ptr, size_t len) + { + buf_.ptr = ptr; + buf_.len = len; + } + + const wsrep_buf_t& get() const { return buf_; } + private: + wsrep_buf_t buf_; + }; +} + +#endif // TRREP_DATA_HPP diff --git a/src/exception.hpp b/src/exception.hpp new file mode 100644 index 0000000..76cd7d9 --- /dev/null +++ b/src/exception.hpp @@ -0,0 +1,31 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_EXCEPTION_HPP +#define TRREP_EXCEPTION_HPP + +#include + +namespace trrep +{ + class runtime_error : public std::runtime_error + { + public: + runtime_error(const std::string& msg) + : std::runtime_error(msg) + { } + }; + + class not_implemented_error : public std::exception + { + public: + not_implemented_error() + : std::exception() + { } + }; + +} + + +#endif // TRREP_EXCEPTION_HPP diff --git a/src/key.hpp b/src/key.hpp new file mode 100644 index 0000000..c7b2fd2 --- /dev/null +++ b/src/key.hpp @@ -0,0 +1,41 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_KEY_HPP +#define TRREP_KEY_HPP + +#include "exception.hpp" + +namespace trrep +{ + class key + { + public: + key() + : key_parts_() + , key_() + { + key_.key_parts = key_parts_; + key_.key_parts_num = 0; + } + + void append_key_part(const void* ptr, size_t len) + { + if (key_.key_parts_num == 3) + { + throw trrep::runtime_error("key parts exceed maximum of 3"); + } + key_parts_[key_.key_parts_num].ptr = ptr; + key_parts_[key_.key_parts_num].len = len; + ++key_.key_parts_num; + } + + const wsrep_key_t& get() const { return key_; } + private: + wsrep_buf_t key_parts_[3]; + wsrep_key_t key_; + }; +} + +#endif // TRREP_KEY_HPP diff --git a/src/lock.hpp b/src/lock.hpp new file mode 100644 index 0000000..9b3cadf --- /dev/null +++ b/src/lock.hpp @@ -0,0 +1,57 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_LOCK_HPP +#define TRREP_LOCK_HPP + +#include "mutex.hpp" + +#include + +namespace trrep +{ + template + class unique_lock + { + public: + unique_lock(M& mutex) + : mutex_(mutex) + , locked_(false) + { + mutex_.lock(); + locked_ = true; + } + ~unique_lock() + { + assert(locked_); + mutex_.unlock(); + } + + void lock() + { + mutex_.lock(); + assert(locked_ == false); + locked_ = true; + } + + void unlock() + { + assert(locked_); + locked_ = false; + mutex_.unlock(); + } + + bool owns_lock() const + { + return locked_; + } + private: + unique_lock(const unique_lock&); + unique_lock& operator=(const unique_lock&); + M& mutex_; + bool locked_; + }; +} + +#endif // TRREP_LOCK_HPP diff --git a/src/mock_provider_impl.hpp b/src/mock_provider_impl.hpp new file mode 100644 index 0000000..b1edbca --- /dev/null +++ b/src/mock_provider_impl.hpp @@ -0,0 +1,135 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_MOCK_PROVIDER_IMPL_HPP +#define TRREP_MOCK_PROVIDER_IMPL_HPP + +#include "provider_impl.hpp" + +#include +#include +#include // todo: proper logging + +namespace trrep +{ + class mock_provider_impl : public trrep::provider_impl + { + public: + typedef std::map, wsrep_seqno_t > bf_abort_map; + mock_provider_impl() + : group_id_() + , node_id_() + , group_seqno_(0) + , bf_abort_map_() + { + + memset(&group_id_, 0, sizeof(group_id_)); + group_id_.data[0] = 1; + + memset(&node_id_, 0, sizeof(node_id_)); + node_id_.data[0] = 1; + } + // Provider implemenatation interface + int start_transaction(wsrep_ws_handle_t*) { return 0; } + wsrep_status + certify_commit(wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* ws_handle, + uint32_t flags, + wsrep_trx_meta_t* trx_meta) + { + assert(flags | WSREP_FLAG_TRX_END); + if ((flags | WSREP_FLAG_TRX_END) == 0) + { + return WSREP_FATAL; + } + std::cerr << "certify_commit: " << conn_id << ":" + << ws_handle->trx_id << "\n"; + trx_meta->stid.node = node_id_; + trx_meta->stid.trx = ws_handle->trx_id; + trx_meta->stid.conn = conn_id; + + std::pair + trx_id(conn_id, ws_handle->trx_id); + bf_abort_map::iterator it(bf_abort_map_.find(trx_id)); + std::cerr << "bf aborted: " + << (it == bf_abort_map_.end() ? "no" : "yes") << "\n"; + if (it == bf_abort_map_.end()) + { + ++group_seqno_; + trx_meta->gtid.uuid = group_id_; + trx_meta->gtid.seqno = group_seqno_; + trx_meta->depends_on = group_seqno_ - 1; + std::cerr << "certify_commit return: " << WSREP_OK << "\n"; + return WSREP_OK; + } + else + { + wsrep_status_t ret; + if (it->second == WSREP_SEQNO_UNDEFINED) + { + trx_meta->gtid.uuid = WSREP_UUID_UNDEFINED; + trx_meta->gtid.seqno = WSREP_SEQNO_UNDEFINED; + trx_meta->depends_on = WSREP_SEQNO_UNDEFINED; + ret = WSREP_TRX_FAIL; + } + else + { + ++group_seqno_; + trx_meta->gtid.uuid = group_id_; + trx_meta->gtid.seqno = group_seqno_; + trx_meta->depends_on = group_seqno_ - 1; + ret = WSREP_BF_ABORT; + } + bf_abort_map_.erase(it); + std::cerr << "certify_commit return: " << ret << "\n"; + return ret; + } + + } + + int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } + int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } + int rollback(const wsrep_trx_id_t) { return 0; } + wsrep_status_t commit_order_enter(wsrep_ws_handle_t*) + { return WSREP_OK; } + int commit_order_leave(wsrep_ws_handle_t*) { return 0;} + int release(wsrep_ws_handle_t*) { return 0; } + + // Methods to modify mock state + + // Inject BF abort event into the provider. + // + // If bf_seqno equals to WSREP_SEQNO_UNDEFINED, the BF abort + // is done without ordering and the provider will return + // WSREP_TRX_FAIL to the aborted transaction. Otherwise + // WSREP_BF_ABORT is returned to the aborted transaction. + // + // @param conn_id Connection/client identifier to be aborted + // @param trx_id Trx id to be aborted + // @param bf_seqno Aborter sequence number + // + void bf_abort(wsrep_conn_id_t conn_id, + wsrep_trx_id_t trx_id, + wsrep_seqno_t bf_seqno) + { + std::cerr << "bf_abort: " << conn_id << ":" << trx_id << "\n"; + assert(bf_seqno == WSREP_SEQNO_UNDEFINED || + group_seqno_ < bf_seqno); + bf_abort_map_.insert(std::make_pair(std::make_pair(conn_id, trx_id), + bf_seqno)); + if (bf_seqno != WSREP_SEQNO_UNDEFINED) + { + group_seqno_ = bf_seqno; + } + } + private: + wsrep_uuid_t group_id_; + wsrep_uuid_t node_id_; + wsrep_seqno_t group_seqno_; + bf_abort_map bf_abort_map_; + }; +} + + +#endif // TRREP_MOCK_PROVIDER_IMPL_HPP diff --git a/src/mutex.hpp b/src/mutex.hpp new file mode 100644 index 0000000..ec33b7e --- /dev/null +++ b/src/mutex.hpp @@ -0,0 +1,57 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_MUTEX_HPP +#define TRREP_MUTEX_HPP + +#include "exception.hpp" + +#include + +namespace trrep +{ + // Default pthread implementation + + class mutex + { + public: + mutex() + : mutex_() + { + if (pthread_mutex_init(&mutex_, 0)) + { + throw trrep::runtime_error("mutex init failed"); + } + } + ~mutex() + { + if (pthread_mutex_destroy(&mutex_)) + { + throw trrep::runtime_error("mutex destroy failed"); + } + } + + void lock() + { + if (pthread_mutex_lock(&mutex_)) + { + throw trrep::runtime_error("mutex lock failed"); + } + } + + void unlock() + { + if (pthread_mutex_unlock(&mutex_)) + { + throw trrep::runtime_error("mutex unlock failed"); + } + } + private: + mutex(const mutex& other); + mutex& operator=(const mutex& other); + pthread_mutex_t mutex_; + }; +} + +#endif // TRREP_MUTEX_HPP diff --git a/src/provider.cpp b/src/provider.cpp new file mode 100644 index 0000000..93ac820 --- /dev/null +++ b/src/provider.cpp @@ -0,0 +1,13 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "provider.hpp" + +#include "provider_impl.hpp" + +trrep::provider* trrep::provider::make_provider( + const std::string&) +{ + return 0; +} diff --git a/src/provider.hpp b/src/provider.hpp new file mode 100644 index 0000000..9da1c2a --- /dev/null +++ b/src/provider.hpp @@ -0,0 +1,52 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_PROVIDER_HPP +#define TRREP_PROVIDER_HPP + +#include "provider_impl.hpp" + +#include + +#include + +namespace trrep +{ + class provider + { + + public: + + // Construct a provider with give provider implementation + // + provider(provider_impl* impl) : impl_(impl) { } + int start_transaction(wsrep_ws_handle_t* wsh) + { return impl_->start_transaction(wsh); } + int append_key(wsrep_ws_handle_t* wsh, const wsrep_key_t* key) + { return impl_->append_key(wsh, key); } + int append_data(wsrep_ws_handle_t* wsh, const wsrep_buf_t* buf) + { return impl_->append_data(wsh, buf); } + wsrep_status certify_commit(wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* wsh, + uint32_t flags, wsrep_trx_meta_t* trx_meta) + { return impl_->certify_commit(conn_id, wsh, flags, trx_meta); } + int rollback(const wsrep_trx_id_t trx_id) + { return impl_->rollback(trx_id); } + wsrep_status commit_order_enter(wsrep_ws_handle_t* wsh) + { return impl_->commit_order_enter(wsh); } + int commit_order_leave(wsrep_ws_handle_t* wsh) + { return impl_->commit_order_leave(wsh); } + int release(wsrep_ws_handle_t* wsh) + { return impl_->release(wsh); } + + // Load the provider + // @param Path to provider library or "none" to load + // dummy provider + static provider* make_provider(const std::string& provider); + private: + provider_impl* impl_; + }; +} + +#endif // TRREP_PROVIDER_HPP diff --git a/src/provider_impl.hpp b/src/provider_impl.hpp new file mode 100644 index 0000000..511558f --- /dev/null +++ b/src/provider_impl.hpp @@ -0,0 +1,31 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_PROVIDER_IMPL_HPP +#define TRREP_PROVIDER_IMPL_HPP + +#include + +namespace trrep +{ + // Abstract interface for provider implementations + class provider_impl + { + public: + virtual int start_transaction(wsrep_ws_handle_t*) = 0; + virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0; + virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0; + virtual wsrep_status + certify_commit(wsrep_conn_id_t, wsrep_ws_handle_t*, + uint32_t, + wsrep_trx_meta_t*) = 0; + virtual int rollback(const wsrep_trx_id_t) = 0; + virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; + virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; + virtual int release(wsrep_ws_handle_t*) = 0; + }; +} + + +#endif // TRREP_PROVIDER_IMPL_HPP diff --git a/src/server_context.hpp b/src/server_context.hpp new file mode 100644 index 0000000..a4cab90 --- /dev/null +++ b/src/server_context.hpp @@ -0,0 +1,56 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_SERVER_CONTEXT_HPP +#define TRREP_SERVER_CONTEXT_HPP + +#include + +namespace trrep +{ + // Forward declarations + class transaction_context; + + class server_context + { + public: + + enum rollback_mode + { + rm_async, + rm_sync + }; + + server_context(const std::string& name, + const std::string& id, + enum rollback_mode rollback_mode) + : name_(name) + , id_(id) + , rollback_mode_(rollback_mode) + { } + const std::string& name() const { return name_; } + const std::string& id() const { return id_; } + + virtual void on_connect() { } + virtual void on_view() { } + virtual void on_sync() { } + virtual void on_apply(trrep::transaction_context&) { } + virtual void on_commit(trrep::transaction_context&) { } + + + virtual bool statement_allowed_for_streaming( + const trrep::client_context&, + const trrep::transaction_context&) const + { + // Streaming not implemented yet + return false; + } + private: + std::string name_; + std::string id_; + enum rollback_mode rollback_mode_; + }; +} + +#endif // TRREP_SERVER_CONTEXT_HPP diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp new file mode 100644 index 0000000..ce30d31 --- /dev/null +++ b/src/transaction_context.cpp @@ -0,0 +1,517 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "transaction_context.hpp" +#include "client_context.hpp" +#include "server_context.hpp" +#include "key.hpp" +#include "data.hpp" + +#include // TODO: replace with proper logging utility +#include + +// Public + +int trrep::transaction_context::append_key(const trrep::key& key) +{ + + return provider_.append_key(&ws_handle_, &key.get()); +} + +int trrep::transaction_context::append_data(const trrep::data& data) +{ + + return provider_.append_data(&ws_handle_, &data.get()); +} + +int trrep::transaction_context::before_prepare() +{ + int ret(0); + + trrep::unique_lock lock(client_context_.mutex()); + + assert(client_context_.mode() == trrep::client_context::m_local); + assert(state() == s_executing || state() == s_must_abort); + + if (state() == s_must_abort) + { + return 1; + } + + if (is_streaming()) + { + client_context_.debug_suicide( + "crash_last_fragment_commit_before_fragment_removal"); + lock.unlock(); + if (client_context_.server_context().statement_allowed_for_streaming( + client_context_, *this)) + { + client_context_.override_error(trrep::e_error_during_commit); + ret = 1; + } + else + { + remove_fragments(); + } + lock.lock(); + client_context_.debug_suicide( + "crash_last_fragment_commit_after_fragment_removal"); + } + assert(client_context_.mode() == trrep::client_context::m_local); + assert(state() == s_executing); + return ret; +} + +int trrep::transaction_context::after_prepare() +{ + int ret(1); + trrep::unique_lock lock(client_context_.mutex()); + + assert(client_context_.mode() == trrep::client_context::m_local); + assert(state() == s_executing || state() == s_must_abort); + if (state() == s_must_abort) + { + return 1; + } + + if (state() == s_executing) + { + ret = certify_commit(lock); + assert((ret == 0 || state() == s_committing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + } + else + { + assert(state() == s_must_abort); + client_context_.override_error(trrep::e_deadlock_error); + } + assert(client_context_.mode() == trrep::client_context::m_local); + return ret; +} + +int trrep::transaction_context::before_commit() +{ + int ret(1); + + trrep::unique_lock lock(client_context_.mutex()); + switch (client_context_.mode()) + { + case trrep::client_context::m_local: + + // Commit is one phase - before/after prepare was not called + if (state() == s_executing) + { + ret = certify_commit(lock); + assert((ret == 0 || state() == s_committing) + || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + } + else if (state() != s_committing) + { + assert(state() == s_must_abort); + client_context_.override_error(trrep::e_deadlock_error); + } + else + { + // 2PC commit, prepare was done before + ret = 0; + } + if (ret == 0) + { + lock.unlock(); + switch(provider_.commit_order_enter(&ws_handle_)) + { + case WSREP_OK: + break; + case WSREP_BF_ABORT: + state(lock, s_must_abort); + ret = 1; + break; + default: + ret = 1; + assert(0); + break; + } + lock.lock(); + } + + break; + case trrep::client_context::m_applier: + ret = provider_.commit_order_enter(&ws_handle_); + if (ret) + { + state(lock, s_must_abort); + } + break; + } + return ret; +} + +int trrep::transaction_context::ordered_commit() +{ + int ret(1); + + trrep::unique_lock lock(client_context_.mutex()); + assert(state() == s_committing); + ret = provider_.commit_order_leave(&ws_handle_); + // Should always succeed + assert(ret == 0); + state(lock, s_ordered_commit); + return ret; +} + +int trrep::transaction_context::after_commit() +{ + int ret(0); + + trrep::unique_lock lock(client_context_.mutex()); + + assert(state() == s_ordered_commit); + + switch (client_context_.mode()) + { + case trrep::client_context::m_local: + if (is_streaming()) + { + clear_fragments(); + } + ret = provider_.release(&ws_handle_); + break; + case trrep::client_context::m_applier: + break; + } + assert(ret == 0); + state(lock, s_committed); + return ret; +} + +int trrep::transaction_context::before_rollback() +{ + trrep::unique_lock lock(client_context_.mutex()); + + assert(state() == s_executing || + state() == s_must_abort || + state() == s_cert_failed || + state() == s_must_replay); + + switch (state()) + { + case s_executing: + // Voluntary rollback + if (is_streaming()) + { + // Replicate rollback fragment + provider_.rollback(id_.get()); + } + state(lock, s_aborting); + break; + case s_must_abort: + if (certified()) + { + state(lock, s_must_replay); + } + else + { + if (is_streaming()) + { + // Replicate rollback fragment + provider_.rollback(id_.get()); + } + state(lock, s_aborting); + } + break; + case s_cert_failed: + if (is_streaming()) + { + // Replicate rollback fragment + provider_.rollback(id_.get()); + } + state(lock, s_aborting); + break; + case s_must_replay: + break; + default: + assert(0); + break; + } + return 0; +} + +int trrep::transaction_context::after_rollback() +{ + trrep::unique_lock lock(client_context_.mutex()); + + assert(state() == s_aborting || + state() == s_must_replay); + + if (state() == s_aborting) + { + state(lock, s_aborted); + } + + // Releasing the transaction from provider is postponed into + // after_statement() hook. Depending on DBMS system all the + // resources acquired by transaction may or may not be released + // during actual rollback. If the transaction has been ordered, + // releasing the commit ordering critical section should be + // also postponed until all resources have been released. + return 0; +} + +int trrep::transaction_context::after_statement() +{ + int ret(0); + trrep::unique_lock lock(client_context_.mutex()); + + assert(state() == s_executing || + state() == s_committed || + state() == s_aborted || + state() == s_must_abort || + state() == s_must_replay); + + switch (state()) + { + case s_executing: + // ? + break; + case s_committed: + if (is_streaming()) + { + state(lock, s_executing); + } + break; + case s_must_abort: + ret = client_context_.rollback(*this); + break; + case s_aborted: + break; + case s_must_replay: + ret = client_context_.replay(*this); + break; + default: + assert(0); + break; + } + + assert(state() == s_executing || + state() == s_committed || + state() == s_aborted); + + if (state() == s_aborted) + { + if (ordered()) + { + ret = provider_.commit_order_enter(&ws_handle_); + if (ret == 0) provider_.commit_order_leave(&ws_handle_); + } + provider_.release(&ws_handle_); + } + + if (state() != s_executing) + { + cleanup(); + } + + return ret; +} + +// Private + +void trrep::transaction_context::state( + trrep::unique_lock& lock __attribute__((unused)), + enum trrep::transaction_context::state next_state) +{ + assert(lock.owns_lock()); + static const char allowed[n_states][n_states] = + { /* ex ce co oc ct cf ma ab ad mr re from/to */ + { 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */ + { 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ + { 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ + { 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ + { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */ + { 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ + { 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */ + }; + if (allowed[state_][next_state]) + { + std::cerr << "state transition: " << trrep::to_string(state_) + << " -> " << trrep::to_string(next_state) + << "\n"; + state_hist_.push_back(state_); + state_ = next_state; + } + else + { + std::ostringstream os; + os << "unallowed state transition for transaction " + << id_.get() << ": " << trrep::to_string(state_) + << " -> " << trrep::to_string(next_state); + std::cerr << os.str() << "\n"; + throw trrep::runtime_error(os.str()); + } +} + +int trrep::transaction_context::certify_fragment() +{ + throw trrep::not_implemented_error(); +} + +int trrep::transaction_context::certify_commit( + trrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(id_ != trrep::transaction_id::invalid()); + + client_context_.wait_for_replayers(lock); + + assert(lock.owns_lock()); + + if (state() == s_must_abort) + { + client_context_.override_error(trrep::e_deadlock_error); + return 1; + } + + state(lock, s_certifying); + + lock.unlock(); + + trrep::data data; + if (client_context_.prepare_data_for_replication(*this, data)) + { + // Note: Error must be set by prepare_data_for_replication() + lock.lock(); + state(lock, s_must_abort); + return 1; + } + + if (client_context_.killed()) + { + lock.lock(); + client_context_.override_error(trrep::e_deadlock_error); + state(lock, s_must_abort); + return 1; + } + + wsrep_status cert_ret(provider_.certify_commit(client_context_.id().get(), + &ws_handle_, + flags() | WSREP_FLAG_TRX_END, + &trx_meta_)); + + lock.lock(); + + assert(state() == s_certifying || state() == s_must_abort); + client_context_.debug_sync("wsrep_after_replication"); + + int ret(1); + switch (cert_ret) + { + case WSREP_OK: + assert(ordered()); + switch (state()) + { + case s_certifying: + certified_ = true; + state(lock, s_committing); + ret = 0; + break; + case s_must_abort: + // We got BF aborted after succesful certification + // and before acquiring client context lock. This means that + // the trasaction must be replayed. + client_context_.will_replay(*this); + state(lock, s_must_replay); + break; + default: + assert(0); + break; + } + certified_ = true; + break; + case WSREP_WARNING: + assert(ordered() == false); + state(lock, s_must_abort); + client_context_.override_error(trrep::e_error_during_commit); + break; + case WSREP_TRX_MISSING: + state(lock, s_must_abort); + // The execution should never reach this point if the + // transaction has not generated any keys or data. + client_context_.override_error(trrep::e_error_during_commit); + assert(0); + break; + case WSREP_BF_ABORT: + // Transaction was replicated succesfully and it was either + // certified succesfully or the result of certifying is not + // yet known. Therefore the transaction must roll back + // and go through replay either to replay and commit the whole + // transaction or to determine failed certification status. + assert(ordered()); + client_context_.will_replay(*this); + state(lock, s_must_abort); + state(lock, s_must_replay); + break; + case WSREP_TRX_FAIL: + state(lock, s_cert_failed); + client_context_.override_error(trrep::e_deadlock_error); + break; + case WSREP_SIZE_EXCEEDED: + state(lock, s_must_abort); + client_context_.override_error(trrep::e_error_during_commit); + break; + case WSREP_CONN_FAIL: + case WSREP_NODE_FAIL: + state(lock, s_must_abort); + client_context_.override_error(trrep::e_error_during_commit); + break; + case WSREP_FATAL: + client_context_.abort(); + break; + case WSREP_NOT_IMPLEMENTED: + case WSREP_NOT_ALLOWED: + client_context_.override_error(trrep::e_error_during_commit); + state(lock, s_must_abort); + assert(0); + break; + default: + client_context_.override_error(trrep::e_error_during_commit); + break; + } + + return ret; +} + +void trrep::transaction_context::remove_fragments() +{ + throw trrep::not_implemented_error(); +} + +void trrep::transaction_context::clear_fragments() +{ + throw trrep::not_implemented_error(); +} + +void trrep::transaction_context::cleanup() +{ + std::cerr << "Cleanup transaction " + << client_context_.id().get() + << ": " << id_.get() << "\n"; + id_ = trrep::transaction_id::invalid(); + state_ = s_executing; + state_hist_.clear(); + trx_meta_.gtid = WSREP_GTID_UNDEFINED; + trx_meta_.stid.node = WSREP_UUID_UNDEFINED; + trx_meta_.stid.trx = trrep::transaction_id::invalid(); + trx_meta_.stid.conn = trrep::client_id::invalid(); + certified_ = false; + pa_unsafe_ = false; +} diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp new file mode 100644 index 0000000..61093f9 --- /dev/null +++ b/src/transaction_context.hpp @@ -0,0 +1,188 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_TRANSACTION_CONTEXT_HPP +#define TRREP_TRANSACTION_CONTEXT_HPP + +#include "provider.hpp" +#include "lock.hpp" + +#include + +#include +#include + +namespace trrep +{ + class client_context; + class key; + class data; + + class transaction_id + { + public: + template + transaction_id(I id) + : id_(static_cast(id)) + { } + wsrep_trx_id_t get() const { return id_; } + static wsrep_trx_id_t invalid() { return wsrep_trx_id_t(-1); } + bool operator==(const transaction_id& other) const + { return (id_ == other.id_); } + bool operator!=(const transaction_id& other) const + { return (id_ != other.id_); } + private: + wsrep_trx_id_t id_; + }; + + class transaction_context + { + public: + enum state + { + s_executing, + s_certifying, + s_committing, + s_ordered_commit, + s_committed, + s_cert_failed, + s_must_abort, + s_aborting, + s_aborted, + s_must_replay, + s_replaying + }; + static const int n_states = s_replaying + 1; + transaction_context(trrep::provider& provider, + trrep::client_context& client_context) + : provider_(provider) + , client_context_(client_context) + , id_(transaction_id::invalid()) + , state_(s_executing) + , state_hist_() + , ws_handle_() + , trx_meta_() + , pa_unsafe_(false) + , certified_(false) + { } + +#if 0 + transaction_context(trrep::provider& provider, + trrep::client_context& client_context, + const transaction_id& id) + : provider_(provider) + , client_context_(client_context) + , id_(id) + , state_(s_executing) + , ws_handle_() + , gtid_() + { } +#endif + // Accessors + trrep::transaction_id id() const + { return id_; } + + bool active() const + { return (id_ != trrep::transaction_id::invalid()); } + + enum state state() const + { return state_; } + + void state(trrep::unique_lock&, enum state); + + // Return true if the certification of the last + // fragment succeeded + bool certified() { return certified_; } + + // Return true if the last fragment was ordered by the + // provider + bool ordered() { return (trx_meta_.gtid.seqno > 0); } + + bool is_streaming() const + { + // Streaming support not yet implemented + return false; + } + + bool pa_unsafe() const { return pa_unsafe_; } + void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } + // + int start_transaction(const trrep::transaction_id& id) + { + assert(active() == false); + id_ = id; + ws_handle_.trx_id = id_.get(); + return provider_.start_transaction(&ws_handle_); + } + + int append_key(const trrep::key&); + + int append_data(const trrep::data&); + + int after_row(); + + int before_prepare(); + + int after_prepare(); + + int before_commit(); + + int ordered_commit(); + + int after_commit(); + + int before_rollback(); + + int after_rollback(); + + int before_statement(); + + int after_statement(); + + private: + uint32_t flags() const + { + uint32_t ret(0); + if (is_streaming() == false) ret |= WSREP_FLAG_TRX_START; + if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE; + return ret; + } + int certify_fragment(); + int certify_commit(trrep::unique_lock&); + void remove_fragments(); + void clear_fragments(); + void cleanup(); + trrep::provider& provider_; + trrep::client_context& client_context_; + trrep::transaction_id id_; + enum state state_; + std::vector state_hist_; + wsrep_ws_handle_t ws_handle_; + wsrep_trx_meta_t trx_meta_; + bool pa_unsafe_; + bool certified_; + }; + + static inline std::string to_string(enum trrep::transaction_context::state state) + { + switch (state) + { + case trrep::transaction_context::s_executing: return "executing"; + case trrep::transaction_context::s_certifying: return "certifying"; + case trrep::transaction_context::s_committing: return "committing"; + case trrep::transaction_context::s_ordered_commit: return "ordered_commit"; + case trrep::transaction_context::s_committed: return "committed"; + case trrep::transaction_context::s_cert_failed: return "cert_failed"; + case trrep::transaction_context::s_must_abort: return "must_abort"; + case trrep::transaction_context::s_aborting: return "aborting"; + case trrep::transaction_context::s_aborted: return "aborted"; + case trrep::transaction_context::s_must_replay: return "must_replay"; + case trrep::transaction_context::s_replaying: return "replaying"; + default: return "unknown"; + } + } + +} + +#endif // TRREP_TRANSACTION_CONTEXT_HPP diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp new file mode 100644 index 0000000..f24cefe --- /dev/null +++ b/src/transaction_context_test.cpp @@ -0,0 +1,349 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "transaction_context.hpp" +#include "client_context.hpp" +#include "server_context.hpp" +#include "provider.hpp" +#include "mock_provider_impl.hpp" + +#include + +// +// Utility functions +// +namespace +{ + // Simple BF abort method to BF abort unordered transasctions + void bf_abort_unordered(trrep::client_context& cc, + trrep::transaction_context& tc) + { + trrep::unique_lock lock(cc.mutex()); + tc.state(lock, trrep::transaction_context::s_must_abort); + } + + // BF abort method to abort transactions via provider + void bf_abort_provider(trrep::mock_provider_impl& provider, + const trrep::client_context& cc, + const trrep::transaction_context& tc, + wsrep_seqno_t seqno) + { + provider.bf_abort(cc.id().get(), tc.id().get(), seqno); + } + + +} + +// +// Test a succesful 1PC transaction lifecycle +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Run before commit + BOOST_REQUIRE(tc.before_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing); + + // Run ordered commit + BOOST_REQUIRE(tc.ordered_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); + + // Run after commit + BOOST_REQUIRE(tc.after_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); + +} + +// +// Test a succesful 2PC transaction lifecycle +// +BOOST_AUTO_TEST_CASE(transaction_context_2pc) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Run before prepare + BOOST_REQUIRE(tc.before_prepare() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Run after prepare + BOOST_REQUIRE(tc.after_prepare() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing); + + // Run before commit + BOOST_REQUIRE(tc.before_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing); + + // Run ordered commit + BOOST_REQUIRE(tc.ordered_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); + + // Run after commit + BOOST_REQUIRE(tc.after_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} + +// +// Test a 1PC transaction which gets BF aborted before before_commit +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + bf_abort_unordered(cc, tc); + + // Run before commit + BOOST_REQUIRE(tc.before_commit()); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort); + BOOST_REQUIRE(tc.certified() == false); + BOOST_REQUIRE(tc.ordered() == false); + + // Rollback sequence + BOOST_REQUIRE(tc.before_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); + BOOST_REQUIRE(tc.after_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} + +// +// Test a 2PC transaction which gets BF aborted before before_prepare +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + bf_abort_unordered(cc, tc); + + // Run before commit + BOOST_REQUIRE(tc.before_prepare()); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort); + BOOST_REQUIRE(tc.certified() == false); + BOOST_REQUIRE(tc.ordered() == false); + + // Rollback sequence + BOOST_REQUIRE(tc.before_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); + BOOST_REQUIRE(tc.after_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} + +// +// Test a 2PC transaction which gets BF aborted before before_prepare +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Run before prepare + BOOST_REQUIRE(tc.before_prepare() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + bf_abort_unordered(cc, tc); + + // Run before commit + BOOST_REQUIRE(tc.after_prepare()); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_abort); + BOOST_REQUIRE(tc.certified() == false); + BOOST_REQUIRE(tc.ordered() == false); + + // Rollback sequence + BOOST_REQUIRE(tc.before_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); + BOOST_REQUIRE(tc.after_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} + +// +// Test a 1PC transaction which gets BF aborted during before_commit via +// provider before the write set was ordered and certified. +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + bf_abort_provider(mock_provider, cc, tc, WSREP_SEQNO_UNDEFINED); + + // Run before commit + BOOST_REQUIRE(tc.before_commit()); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_cert_failed); + BOOST_REQUIRE(tc.certified() == false); + BOOST_REQUIRE(tc.ordered() == false); + + // Rollback sequence + BOOST_REQUIRE(tc.before_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); + BOOST_REQUIRE(tc.after_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} + + +// +// Test a 1PC transaction which gets BF aborted during before_commit via +// provider after the write set was ordered and certified. +// +BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) +{ + trrep::mock_provider_impl mock_provider; + trrep::provider provider(&mock_provider); + trrep::server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::transaction_context tc(provider, cc); + + // Verify initial state + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + // Start a new transaction with ID 1 + tc.start_transaction(1); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + + bf_abort_provider(mock_provider, cc, tc, 1); + + // Run before commit + BOOST_REQUIRE(tc.before_commit()); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay); + BOOST_REQUIRE(tc.certified() == false); + BOOST_REQUIRE(tc.ordered() == true); + + // Rollback sequence + BOOST_REQUIRE(tc.before_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay); + BOOST_REQUIRE(tc.after_rollback() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_must_replay); + + // Cleanup after statement + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.ordered() == false); + BOOST_REQUIRE(tc.certified() == false); +} diff --git a/src/trrep_test.cpp b/src/trrep_test.cpp new file mode 100644 index 0000000..668f01b --- /dev/null +++ b/src/trrep_test.cpp @@ -0,0 +1,7 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#define BOOST_TEST_MODULE trrep_test +#include +