diff --git a/CMakeLists.txt b/CMakeLists.txt index 78e1fbd..672b7b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,7 @@ find_package(Boost 1.54.0 REQUIRED unit_test_framework program_options filesystem + thread ) # Coverage diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3af900b..a9dbcdc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -27,5 +27,5 @@ add_test(NAME trrep_test add_executable(dbms_simulator dbms_simulator.cpp) -target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY}) +target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14) diff --git a/src/client_context.cpp b/src/client_context.cpp index 5581249..ef6e258 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -4,6 +4,11 @@ #include "client_context.hpp" #include "transaction_context.hpp" +#include "compiler.hpp" + +#include +#include + trrep::provider& trrep::client_context::provider() const { @@ -12,6 +17,7 @@ trrep::provider& trrep::client_context::provider() const int trrep::client_context::before_command() { + trrep::unique_lock lock(mutex_); assert(state_ == s_idle); if (server_context_.rollback_mode() == trrep::server_context::rm_sync) { @@ -26,7 +32,7 @@ int trrep::client_context::before_command() // cond_.wait(lock); } } - state_ = s_exec; + state(lock, s_exec); if (transaction_.state() == trrep::transaction_context::s_must_abort) { return 1; @@ -45,7 +51,7 @@ int trrep::client_context::after_command() lock.lock(); ret = 1; } - state_ = s_idle; + state(lock, s_idle); return ret; } @@ -72,5 +78,33 @@ int trrep::client_context::after_statement() * \todo Check for replay state, do rollback if requested. */ #endif // 0 + transaction_.after_statement(); return 0; } + +// Private + +void trrep::client_context::state( + trrep::unique_lock& lock TRREP_UNUSED, + enum trrep::client_context::state state) +{ + assert(lock.owns_lock()); + char allowed[state_max_][state_max_] = + { + /* idle exec quit */ + { 0, 1, 1}, /* idle */ + { 1, 0, 1}, /* exec */ + { 0, 0, 0} + }; + if (allowed[state_][state]) + { + state_ = state; + } + else + { + std::ostringstream os; + os << "client_context: Unallowed state transition: " + << state_ << " -> " << state << "\n"; + throw trrep::runtime_error(os.str()); + } +} diff --git a/src/client_context.hpp b/src/client_context.hpp index 821ab57..3f0c6d3 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -113,10 +113,14 @@ namespace trrep s_quitting }; + const static int state_max_ = s_quitting + 1; /*! * Destructor. */ - virtual ~client_context() { } + virtual ~client_context() + { + assert(transaction_.active() == false); + } /*! * Virtual method which should be called before the client @@ -172,6 +176,68 @@ namespace trrep */ virtual int after_statement(); + int start_transaction(const trrep::transaction_id& id) + { + assert(state_ == s_exec); + return transaction_.start_transaction(id); + } + + int start_transaction(const wsrep_ws_handle_t& wsh, + const wsrep_trx_meta_t& meta, + uint32_t flags) + { + assert(mode_ == m_applier); + return transaction_.start_transaction(wsh, meta, flags); + } + + int append_key(const trrep::key& key) + { + assert(state_ == s_exec); + return transaction_.append_key(key); + } + + int append_data(const trrep::data& data) + { + assert(state_ == s_exec); + return transaction_.append_data(data); + } + int before_prepare() + { + assert(state_ == s_exec); + return transaction_.before_prepare(); + } + int after_prepare() + { + assert(state_ == s_exec); + return transaction_.after_prepare(); + } + int before_commit() + { + assert(state_ == s_exec); + return transaction_.before_commit(); + } + + int ordered_commit() + { + assert(state_ == s_exec); + return transaction_.ordered_commit(); + } + + int after_commit() + { + assert(state_ == s_exec); + return transaction_.after_commit(); + } + int before_rollback() + { + assert(state_ == s_exec); + return transaction_.before_rollback(); + } + int after_rollback() + { + assert(state_ == s_exec); + return transaction_.after_rollback(); + } /*! * Get reference to the client mutex. * @@ -213,6 +279,10 @@ namespace trrep */ enum mode mode() const { return mode_; } + trrep::transaction_context& transaction() + { + return transaction_; + } protected: /*! * Client context constuctor. This is protected so that it @@ -232,6 +302,9 @@ namespace trrep { } private: + client_context(const client_context&); + client_context& operator=(client_context&); + /* * Friend declarations */ @@ -250,6 +323,11 @@ namespace trrep */ enum state state() const { return state_; } + /*! + * Set client state. + */ + void state(trrep::unique_lock& lock, enum state state); + /*! * Virtual method to return true if the client operates * in two phase commit mode. @@ -324,14 +402,34 @@ namespace trrep * */ virtual void override_error(const trrep::client_error&) = 0; + + /*! + * Return true if the current client operation was killed. + */ virtual bool killed() const = 0; + + /*! + * Abort server operation on fatal error. This should be used + * only for critical conditions which would sacrifice data + * consistency. + */ virtual void abort() const = 0; + + /*! + * Set up thread global variables for client connection. + */ virtual void store_globals() = 0; - // Debug helpers + + /*! + * Enter debug synchronization point. + */ virtual void debug_sync(const std::string&) = 0; + + /*! + * + */ virtual void debug_suicide(const std::string&) = 0; - void state(enum state state); trrep::mutex& mutex_; trrep::server_context& server_context_; diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index 3c1ead8..94bad25 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -19,12 +19,14 @@ #include #include +#include #include #include #include #include -#include +#include +// #include class dbms_server; @@ -99,12 +101,6 @@ class dbms_client; class dbms_server : public trrep::server_context { public: - enum state - { - s_disconnected, - s_connected, - s_synced - }; dbms_server(dbms_simulator& simulator, const std::string& name, const std::string& id, @@ -116,7 +112,6 @@ public: , simulator_(simulator) , mutex_() , cond_() - , state_(s_disconnected) , last_client_id_(0) , last_transaction_id_(0) , appliers_() @@ -131,7 +126,7 @@ public: void start_applier() { trrep::unique_lock lock(mutex_); - appliers_.push_back(std::thread(&dbms_server::applier_thread, this)); + appliers_.push_back(boost::thread(&dbms_server::applier_thread, this)); } void stop_applier() @@ -177,12 +172,11 @@ private: dbms_simulator& simulator_; trrep::default_mutex mutex_; trrep::default_condition_variable cond_; - enum state state_; std::atomic last_client_id_; std::atomic last_transaction_id_; - std::vector appliers_; + std::vector appliers_; std::map> clients_; - std::vector client_threads_; + std::vector client_threads_; }; class dbms_client : public trrep::client_context @@ -196,13 +190,10 @@ public: , mutex_() , server_(server) , n_transactions_(n_transactions) - , result_() { } - ~dbms_client() - { - std::cout << "Result: " << result_; - } + ~dbms_client() { } + void start() { for (size_t i(0); i < n_transactions_; ++i) @@ -257,12 +248,18 @@ private: void run_one_transaction() { - trrep::transaction_context trx(*this); - trx.start_transaction(server_.next_transaction_id()); - std::ostringstream os; + before_command(); + before_statement(); + start_transaction(server_.next_transaction_id()); + after_statement(); + after_command(); + int err(0); for (int i(0); i < 1 && err == 0; ++i) { + std::ostringstream os; + before_command(); + before_statement(); int data(std::rand() % 10000000); os << data; trrep::key key; @@ -270,24 +267,30 @@ private: wsrep_conn_id_t client_id(id().get()); key.append_key_part(&client_id, sizeof(client_id)); key.append_key_part(&data, sizeof(data)); - err = trx.append_key(key); - err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); + err = append_key(key); + err = err || append_data(trrep::data(os.str().c_str(), os.str().size())); + after_statement(); + after_command(); } + + before_command(); + before_statement(); // std::cout << "append_data: " << err << "\n"; - if (do_2pc()) + if (err == 0 && do_2pc()) { - err = err || trx.before_prepare(); + err = err || before_prepare(); // std::cout << "before_prepare: " << err << "\n"; - err = err || trx.after_prepare(); + err = err || after_prepare(); // std::cout << "after_prepare: " << err << "\n"; } - err = err || trx.before_commit(); + err = err || before_commit(); // std::cout << "before_commit: " << err << "\n"; - err = err || trx.ordered_commit(); + err = err || ordered_commit(); // std::cout << "ordered_commit: " << err << "\n"; - err = err || trx.after_commit(); + err = err || after_commit(); // std::cout << "after_commit: " << err << "\n"; - trx.after_statement(); + after_statement(); + after_command(); } void report_progress(size_t i) const @@ -303,7 +306,6 @@ private: trrep::default_mutex mutex_; dbms_server& server_; const size_t n_transactions_; - size_t result_; }; @@ -353,7 +355,7 @@ void dbms_server::start_client(size_t id) trrep::client_context::m_replicating, simulator_.params().n_transactions)); client_threads_.push_back( - std::thread(&dbms_server::client_thread, this, client)); + boost::thread(&dbms_server::client_thread, this, client)); } diff --git a/src/mock_client_context.hpp b/src/mock_client_context.hpp index 9339843..385a2ec 100644 --- a/src/mock_client_context.hpp +++ b/src/mock_client_context.hpp @@ -25,6 +25,13 @@ namespace trrep , do_2pc_(do_2pc) , fail_next_applying_() { } + ~mock_client_context() + { + if (transaction().active()) + { + (void)rollback(transaction()); + } + } int apply(trrep::transaction_context&, const trrep::data&); int commit(trrep::transaction_context&); int rollback(trrep::transaction_context&); diff --git a/src/mock_utils.cpp b/src/mock_utils.cpp index 3f26a08..edc8389 100644 --- a/src/mock_utils.cpp +++ b/src/mock_utils.cpp @@ -25,7 +25,7 @@ void trrep_mock::bf_abort_provider(trrep::mock_server_context& sc, (void)victim_seqno; } -trrep::transaction_context trrep_mock::applying_transaction( +trrep::transaction_context& trrep_mock::start_applying_transaction( trrep::client_context& cc, const trrep::transaction_id& id, wsrep_seqno_t seqno, @@ -33,10 +33,14 @@ trrep::transaction_context trrep_mock::applying_transaction( { wsrep_ws_handle_t ws_handle = { id.get(), 0 }; wsrep_trx_meta_t meta = { - { {1 }, seqno }, /* gtid */ - { { static_cast(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */ - seqno - 1 + { {1 }, seqno }, /* gtid */ + { { static_cast(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */ + seqno - 1 }; - trrep::transaction_context ret(cc, ws_handle, meta, flags); - return ret; + int ret(cc.start_transaction(ws_handle, meta, flags)); + if (ret != 0) + { + throw trrep::runtime_error("failed to start applying transaction"); + } + return cc.transaction(); } diff --git a/src/mock_utils.hpp b/src/mock_utils.hpp index 57e738b..a14286f 100644 --- a/src/mock_utils.hpp +++ b/src/mock_utils.hpp @@ -28,7 +28,7 @@ namespace trrep_mock const trrep::transaction_context& tc, wsrep_seqno_t bf_seqno); - trrep::transaction_context applying_transaction( + trrep::transaction_context& start_applying_transaction( trrep::client_context& cc, const trrep::transaction_id& id, wsrep_seqno_t seqno, diff --git a/src/server_context.cpp b/src/server_context.cpp index 361a872..d832482 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -113,12 +113,13 @@ namespace assert(client_context->mode() == trrep::client_context::m_applier); trrep::data data(buf->ptr, buf->len); - trrep::transaction_context transaction_context(*client_context, - *wsh, - *meta, - flags); - if (client_context->server_context().on_apply( - *client_context, transaction_context, data)) + if (client_context->start_transaction(*wsh, *meta, flags)) + { + ret = WSREP_CB_FAILURE; + } + if (ret == WSREP_CB_SUCCESS && + client_context->server_context().on_apply( + *client_context, client_context->transaction(), data)) { ret = WSREP_CB_FAILURE; } @@ -294,11 +295,13 @@ int trrep::server_context::on_apply( assert(0); } - transaction_context.after_statement(); if (ret) { client_context.rollback(transaction_context); } + + transaction_context.after_statement(); + assert(transaction_context.active() == false); return ret; } diff --git a/src/server_context.hpp b/src/server_context.hpp index d48c3cf..f6db5c4 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -21,7 +21,11 @@ * State Snapshot Transfer * ----------------------- * - * TODO + * Depending on SST type (physical or logical), the server storage + * engine initialization must be done before or after SST happens. + * In case of physical SST method (typically rsync, filesystem snapshot) + * the SST happens before the storage engine is initialized, in case + * of logical backup typically after the storage engine initialization. * * Rollback Mode * ------------- @@ -52,14 +56,6 @@ * to be aborted or in case of fully optimistic concurrency control, * the conflict is detected at commit. * - * SST - * === - * - * Depending on SST type (physical or logical), the server storage - * engine initialization must be done before or after SST happens. - * In case of physical SST method (typically rsync, filesystem snapshot) - * the SST happens before the storage engine is initialized, in case - * of logical backup typically after the storage engine initialization. */ #ifndef TRREP_SERVER_CONTEXT_HPP @@ -135,7 +131,9 @@ namespace trrep /*! Server is donating state snapshot transfer */ s_donor, /*! Server has synced with the cluster */ - s_synced + s_synced, + /*! Server is disconnecting from group */ + s_disconnecting }; /*! * Rollback Mode enumeration @@ -252,6 +250,10 @@ namespace trrep * Wait until server reaches given state. * * \todo Waiting for transitional states may not be reliable. + * Should introduce array of waiter counts per state, + * on state change the state changing thread is + * not allowed to proceed until all waiters have + * woken up. */ void wait_until_state(trrep::server_context::state) const; @@ -261,6 +263,7 @@ namespace trrep * engine initialization, false otherwise. */ virtual bool sst_before_init() const = 0; + /*! * Virtual method which will be called on *joiner* when the provider * requests the SST request information. This method should @@ -329,6 +332,7 @@ namespace trrep virtual bool statement_allowed_for_streaming( const trrep::client_context& client_context, const trrep::transaction_context& transaction_context) const; + protected: /*! Server Context constructor * diff --git a/src/server_context_test.cpp b/src/server_context_test.cpp index a80ea5d..a29ae9b 100644 --- a/src/server_context_test.cpp +++ b/src/server_context_test.cpp @@ -16,8 +16,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc) trrep::client_id(1), trrep::client_context::m_applier, false); - trrep::transaction_context tc( - trrep_mock::applying_transaction( + trrep::transaction_context& tc( + trrep_mock::start_applying_transaction( cc, 1, 1, WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); char buf[1] = { 1 }; @@ -34,8 +34,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc) trrep::client_id(1), trrep::client_context::m_applier, true); - trrep::transaction_context tc( - trrep_mock::applying_transaction( + trrep::transaction_context& tc( + trrep_mock::start_applying_transaction( cc, 1, 1, WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); char buf[1] = { 1 }; @@ -54,11 +54,12 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc_rollback) trrep::client_context::m_applier, false); cc.fail_next_applying(true); - trrep::transaction_context tc( - trrep_mock::applying_transaction( + trrep::transaction_context& tc( + trrep_mock::start_applying_transaction( cc, 1, 1, WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); char buf[1] = { 1 }; + BOOST_REQUIRE(sc.on_apply(cc, tc, trrep::data(buf, 1)) == 1); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); } @@ -74,8 +75,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc_rollback) trrep::client_context::m_applier, true); cc.fail_next_applying(true); - trrep::transaction_context tc( - trrep_mock::applying_transaction( + trrep::transaction_context& tc( + trrep_mock::start_applying_transaction( cc, 1, 1, WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); char buf[1] = { 1 }; diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 099301e..ef22bd5 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -7,6 +7,7 @@ #include "server_context.hpp" #include "key.hpp" #include "data.hpp" +#include "compiler.hpp" #include // TODO: replace with proper logging utility #include @@ -30,40 +31,9 @@ trrep::transaction_context::transaction_context( , rollback_replicated_for_(false) { } -trrep::transaction_context::transaction_context( - trrep::client_context& client_context, - const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta, - uint32_t flags) - : provider_(client_context.provider()) - , client_context_(client_context) - , id_(transaction_id::invalid()) - , state_(s_executing) - , state_hist_() - , ws_handle_(ws_handle) - , trx_meta_(trx_meta) - , flags_(flags) - , pa_unsafe_() - , certified_(true) - , fragments_() - , rollback_replicated_for_(false) -{ } - trrep::transaction_context::~transaction_context() -{ - if (active()) - { - try - { - (void)client_context_.rollback(*this); - } - catch (...) - { - // TODO: Log warning - } - } -} +{ } int trrep::transaction_context::start_transaction( const trrep::transaction_id& id) @@ -86,6 +56,21 @@ int trrep::transaction_context::start_transaction( } } +int trrep::transaction_context::start_transaction( + const wsrep_ws_handle_t& ws_handle, + const wsrep_trx_meta_t& trx_meta, + uint32_t flags) +{ + assert(active() == false); + assert(client_context_.mode() == trrep::client_context::m_applier); + state_ = s_executing; + ws_handle_ = ws_handle; + trx_meta_ = trx_meta; + flags_ = flags; + certified_ = true; + return 0; +} + int trrep::transaction_context::append_key(const trrep::key& key) { @@ -104,7 +89,7 @@ int trrep::transaction_context::before_prepare() int ret(0); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("before_prepare_enter"); assert(state() == s_executing || state() == s_must_abort); if (state() == s_must_abort) @@ -144,7 +129,7 @@ int trrep::transaction_context::before_prepare() } assert(state() == s_preparing); - debug_log_state(); + debug_log_state("before_prepare_leave"); return ret; } @@ -152,7 +137,7 @@ int trrep::transaction_context::after_prepare() { int ret(1); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("after_prepare_enter"); assert(state() == s_preparing || state() == s_must_abort); if (state() == s_must_abort) { @@ -184,7 +169,7 @@ int trrep::transaction_context::after_prepare() ret = 0; break; } - debug_log_state(); + debug_log_state("after_prepare_leave"); return ret; } @@ -193,7 +178,7 @@ int trrep::transaction_context::before_commit() int ret(1); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("before_commit_enter"); assert(state() == s_executing || state() == s_committing || state() == s_must_abort); @@ -264,7 +249,7 @@ int trrep::transaction_context::before_commit() } break; } - debug_log_state(); + debug_log_state("before_commit_leave"); return ret; } @@ -273,14 +258,14 @@ int trrep::transaction_context::ordered_commit() int ret(1); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("ordered_commit_enter"); assert(state() == s_committing); assert(ordered()); ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_); // Should always succeed assert(ret == 0); state(lock, s_ordered_commit); - debug_log_state(); + debug_log_state("ordered_commit_leave"); return ret; } @@ -289,7 +274,7 @@ int trrep::transaction_context::after_commit() int ret(0); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("after_commit_enter"); assert(state() == s_ordered_commit); switch (client_context_.mode()) @@ -309,14 +294,14 @@ int trrep::transaction_context::after_commit() } assert(ret == 0); state(lock, s_committed); - debug_log_state(); + debug_log_state("after_commit_leave"); return ret; } int trrep::transaction_context::before_rollback() { trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("before_rollback_enter"); assert(state() == s_executing || state() == s_must_abort || state() == s_cert_failed || @@ -362,14 +347,14 @@ int trrep::transaction_context::before_rollback() assert(0); break; } - debug_log_state(); + debug_log_state("before_rollback_leave"); return 0; } int trrep::transaction_context::after_rollback() { trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("after_rollback_enter"); assert(state() == s_aborting || state() == s_must_replay); @@ -384,7 +369,7 @@ int trrep::transaction_context::after_rollback() // during actual rollback. If the transaction has been ordered, // releasing the commit ordering critical section should be // also postponed until all resources have been released. - debug_log_state(); + debug_log_state("after_rollback_leave"); return 0; } @@ -392,7 +377,7 @@ int trrep::transaction_context::after_statement() { int ret(0); trrep::unique_lock lock(client_context_.mutex()); - debug_log_state(); + debug_log_state("after_statement_enter"); assert(state() == s_executing || state() == s_committed || state() == s_aborted || @@ -446,7 +431,7 @@ int trrep::transaction_context::after_statement() cleanup(); } - debug_log_state(); + debug_log_state("after_statement_leave"); return ret; } @@ -458,7 +443,7 @@ void trrep::transaction_context::state( { assert(lock.owns_lock()); static const char allowed[n_states][n_states] = - { /* ex pr ce co oc ct cf ma ab ad mr re from/to */ + { /* ex pr ce co oc ct cf ma ab ad mr re */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ { 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ @@ -700,9 +685,7 @@ void trrep::transaction_context::clear_fragments() void trrep::transaction_context::cleanup() { - // std::cerr << "Cleanup transaction " - // << client_context_.id().get() - // << ": " << id_.get() << "\n"; + debug_log_state("cleanup_enter"); id_ = trrep::transaction_id::invalid(); if (is_streaming()) { @@ -715,11 +698,17 @@ void trrep::transaction_context::cleanup() trx_meta_.stid.conn = trrep::client_id::invalid(); certified_ = false; pa_unsafe_ = false; + debug_log_state("cleanup_leave"); } -void trrep::transaction_context::debug_log_state() const +void trrep::transaction_context::debug_log_state( + const std::string& context TRREP_UNUSED) + const { - // std::cout << "client: " << client_context_.id().get() - // << " trx: " << id_.get() - // << " state: " << state_ << "\n"; +#if 0 + std::cout << context + << ": client: " << client_context_.id().get() + << " trx: " << int64_t(id_.get()) + << " state: " << trrep::to_string(state_) << "\n"; +#endif /* 0 */ } diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 4786232..7f92484 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -6,6 +6,7 @@ #define TRREP_TRANSACTION_CONTEXT_HPP #include "provider.hpp" +#include "server_context.hpp" #include "lock.hpp" #include @@ -55,11 +56,10 @@ namespace trrep s_replaying }; static const int n_states = s_replaying + 1; + enum state state() const + { return state_; } + transaction_context(trrep::client_context& client_context); - transaction_context(trrep::client_context& client_context, - const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta, - uint32_t flags); ~transaction_context(); // Accessors trrep::transaction_id id() const @@ -68,8 +68,6 @@ namespace trrep bool active() const { return (id_ != trrep::transaction_id::invalid()); } - enum state state() const - { return state_; } void state(trrep::unique_lock&, enum state); @@ -99,6 +97,10 @@ namespace trrep int start_transaction(const trrep::transaction_id& id); + int start_transaction(const wsrep_ws_handle_t& ws_handle, + const wsrep_trx_meta_t& trx_meta, + uint32_t flags); + int append_key(const trrep::key&); int append_data(const trrep::data&); @@ -128,12 +130,15 @@ namespace trrep return flags_; } private: + transaction_context(const transaction_context&); + transaction_context operator=(const transaction_context&); + int certify_fragment(trrep::unique_lock&); int certify_commit(trrep::unique_lock&); void remove_fragments(); void clear_fragments(); void cleanup(); - void debug_log_state() const; + void debug_log_state(const std::string&) const; trrep::provider& provider_; trrep::client_context& client_context_; trrep::transaction_id id_; diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index ac6afb8..8e3f1a2 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc) trrep::server_context::rm_sync); trrep::mock_client_context cc(sc,trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc) trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -108,7 +108,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_rollback) trrep::server_context::rm_sync); trrep::mock_client_context cc(sc,trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -144,7 +144,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -185,7 +185,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare) trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -226,7 +226,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare) trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -272,7 +272,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(cc); + trrep::transaction_context& tc(cc.transaction()); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -355,9 +355,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying) trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_applier); - trrep::transaction_context tc(trrep_mock::applying_transaction( - cc, 1, 1, - WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); + trrep::transaction_context& tc(trrep_mock::start_applying_transaction( + cc, 1, 1, + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.start_transaction() == 0); @@ -371,6 +371,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying) BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); BOOST_REQUIRE(tc.after_commit() == 0); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + BOOST_REQUIRE(tc.active() == false); } BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying) @@ -380,9 +383,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying) trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_applier); - trrep::transaction_context tc(trrep_mock::applying_transaction( - cc, 1, 1, - WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); + trrep::transaction_context& tc(trrep_mock::start_applying_transaction( + cc, 1, 1, + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.start_transaction() == 0); @@ -400,6 +403,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying) BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); BOOST_REQUIRE(tc.after_commit() == 0); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); + BOOST_REQUIRE(tc.active() == false); } BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback) @@ -409,9 +415,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback) trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_applier); - trrep::transaction_context tc(trrep_mock::applying_transaction( - cc, 1, 1, - WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); + trrep::transaction_context& tc(trrep_mock::start_applying_transaction( + cc, 1, 1, + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.start_transaction() == 0); @@ -423,4 +429,8 @@ BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback) BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); BOOST_REQUIRE(tc.after_rollback() == 0); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + BOOST_REQUIRE(tc.after_statement() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); + BOOST_REQUIRE(tc.active() == false); + }