1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

Refactoring.

This commit is contained in:
Teemu Ollakka
2018-04-29 17:15:24 +03:00
parent c1f8f2c37d
commit c301c0896b
14 changed files with 307 additions and 149 deletions

View File

@ -18,6 +18,7 @@ find_package(Boost 1.54.0 REQUIRED
unit_test_framework unit_test_framework
program_options program_options
filesystem filesystem
thread
) )
# Coverage # Coverage

View File

@ -27,5 +27,5 @@ add_test(NAME trrep_test
add_executable(dbms_simulator add_executable(dbms_simulator
dbms_simulator.cpp) dbms_simulator.cpp)
target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY}) target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY})
set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14) set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14)

View File

@ -4,6 +4,11 @@
#include "client_context.hpp" #include "client_context.hpp"
#include "transaction_context.hpp" #include "transaction_context.hpp"
#include "compiler.hpp"
#include <sstream>
#include <iostream>
trrep::provider& trrep::client_context::provider() const trrep::provider& trrep::client_context::provider() const
{ {
@ -12,6 +17,7 @@ trrep::provider& trrep::client_context::provider() const
int trrep::client_context::before_command() int trrep::client_context::before_command()
{ {
trrep::unique_lock<trrep::mutex> lock(mutex_);
assert(state_ == s_idle); assert(state_ == s_idle);
if (server_context_.rollback_mode() == trrep::server_context::rm_sync) if (server_context_.rollback_mode() == trrep::server_context::rm_sync)
{ {
@ -26,7 +32,7 @@ int trrep::client_context::before_command()
// cond_.wait(lock); // cond_.wait(lock);
} }
} }
state_ = s_exec; state(lock, s_exec);
if (transaction_.state() == trrep::transaction_context::s_must_abort) if (transaction_.state() == trrep::transaction_context::s_must_abort)
{ {
return 1; return 1;
@ -45,7 +51,7 @@ int trrep::client_context::after_command()
lock.lock(); lock.lock();
ret = 1; ret = 1;
} }
state_ = s_idle; state(lock, s_idle);
return ret; return ret;
} }
@ -72,5 +78,33 @@ int trrep::client_context::after_statement()
* \todo Check for replay state, do rollback if requested. * \todo Check for replay state, do rollback if requested.
*/ */
#endif // 0 #endif // 0
transaction_.after_statement();
return 0; return 0;
} }
// Private
void trrep::client_context::state(
trrep::unique_lock<trrep::mutex>& lock TRREP_UNUSED,
enum trrep::client_context::state state)
{
assert(lock.owns_lock());
char allowed[state_max_][state_max_] =
{
/* idle exec quit */
{ 0, 1, 1}, /* idle */
{ 1, 0, 1}, /* exec */
{ 0, 0, 0}
};
if (allowed[state_][state])
{
state_ = state;
}
else
{
std::ostringstream os;
os << "client_context: Unallowed state transition: "
<< state_ << " -> " << state << "\n";
throw trrep::runtime_error(os.str());
}
}

View File

@ -113,10 +113,14 @@ namespace trrep
s_quitting s_quitting
}; };
const static int state_max_ = s_quitting + 1;
/*! /*!
* Destructor. * Destructor.
*/ */
virtual ~client_context() { } virtual ~client_context()
{
assert(transaction_.active() == false);
}
/*! /*!
* Virtual method which should be called before the client * Virtual method which should be called before the client
@ -172,6 +176,68 @@ namespace trrep
*/ */
virtual int after_statement(); virtual int after_statement();
int start_transaction(const trrep::transaction_id& id)
{
assert(state_ == s_exec);
return transaction_.start_transaction(id);
}
int start_transaction(const wsrep_ws_handle_t& wsh,
const wsrep_trx_meta_t& meta,
uint32_t flags)
{
assert(mode_ == m_applier);
return transaction_.start_transaction(wsh, meta, flags);
}
int append_key(const trrep::key& key)
{
assert(state_ == s_exec);
return transaction_.append_key(key);
}
int append_data(const trrep::data& data)
{
assert(state_ == s_exec);
return transaction_.append_data(data);
}
int before_prepare()
{
assert(state_ == s_exec);
return transaction_.before_prepare();
}
int after_prepare()
{
assert(state_ == s_exec);
return transaction_.after_prepare();
}
int before_commit()
{
assert(state_ == s_exec);
return transaction_.before_commit();
}
int ordered_commit()
{
assert(state_ == s_exec);
return transaction_.ordered_commit();
}
int after_commit()
{
assert(state_ == s_exec);
return transaction_.after_commit();
}
int before_rollback()
{
assert(state_ == s_exec);
return transaction_.before_rollback();
}
int after_rollback()
{
assert(state_ == s_exec);
return transaction_.after_rollback();
}
/*! /*!
* Get reference to the client mutex. * Get reference to the client mutex.
* *
@ -213,6 +279,10 @@ namespace trrep
*/ */
enum mode mode() const { return mode_; } enum mode mode() const { return mode_; }
trrep::transaction_context& transaction()
{
return transaction_;
}
protected: protected:
/*! /*!
* Client context constuctor. This is protected so that it * Client context constuctor. This is protected so that it
@ -232,6 +302,9 @@ namespace trrep
{ } { }
private: private:
client_context(const client_context&);
client_context& operator=(client_context&);
/* /*
* Friend declarations * Friend declarations
*/ */
@ -250,6 +323,11 @@ namespace trrep
*/ */
enum state state() const { return state_; } enum state state() const { return state_; }
/*!
* Set client state.
*/
void state(trrep::unique_lock<trrep::mutex>& lock, enum state state);
/*! /*!
* Virtual method to return true if the client operates * Virtual method to return true if the client operates
* in two phase commit mode. * in two phase commit mode.
@ -324,14 +402,34 @@ namespace trrep
* *
*/ */
virtual void override_error(const trrep::client_error&) = 0; virtual void override_error(const trrep::client_error&) = 0;
/*!
* Return true if the current client operation was killed.
*/
virtual bool killed() const = 0; virtual bool killed() const = 0;
/*!
* Abort server operation on fatal error. This should be used
* only for critical conditions which would sacrifice data
* consistency.
*/
virtual void abort() const = 0; virtual void abort() const = 0;
/*!
* Set up thread global variables for client connection.
*/
virtual void store_globals() = 0; virtual void store_globals() = 0;
// Debug helpers
/*!
* Enter debug synchronization point.
*/
virtual void debug_sync(const std::string&) = 0; virtual void debug_sync(const std::string&) = 0;
/*!
*
*/
virtual void debug_suicide(const std::string&) = 0; virtual void debug_suicide(const std::string&) = 0;
void state(enum state state);
trrep::mutex& mutex_; trrep::mutex& mutex_;
trrep::server_context& server_context_; trrep::server_context& server_context_;

View File

@ -19,12 +19,14 @@
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <map> #include <map>
#include <atomic> #include <atomic>
#include <thread> #include <chrono>
// #include <thread>
class dbms_server; class dbms_server;
@ -99,12 +101,6 @@ class dbms_client;
class dbms_server : public trrep::server_context class dbms_server : public trrep::server_context
{ {
public: public:
enum state
{
s_disconnected,
s_connected,
s_synced
};
dbms_server(dbms_simulator& simulator, dbms_server(dbms_simulator& simulator,
const std::string& name, const std::string& name,
const std::string& id, const std::string& id,
@ -116,7 +112,6 @@ public:
, simulator_(simulator) , simulator_(simulator)
, mutex_() , mutex_()
, cond_() , cond_()
, state_(s_disconnected)
, last_client_id_(0) , last_client_id_(0)
, last_transaction_id_(0) , last_transaction_id_(0)
, appliers_() , appliers_()
@ -131,7 +126,7 @@ public:
void start_applier() void start_applier()
{ {
trrep::unique_lock<trrep::mutex> lock(mutex_); trrep::unique_lock<trrep::mutex> lock(mutex_);
appliers_.push_back(std::thread(&dbms_server::applier_thread, this)); appliers_.push_back(boost::thread(&dbms_server::applier_thread, this));
} }
void stop_applier() void stop_applier()
@ -177,12 +172,11 @@ private:
dbms_simulator& simulator_; dbms_simulator& simulator_;
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
trrep::default_condition_variable cond_; trrep::default_condition_variable cond_;
enum state state_;
std::atomic<size_t> last_client_id_; std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_; std::atomic<size_t> last_transaction_id_;
std::vector<std::thread> appliers_; std::vector<boost::thread> appliers_;
std::map<size_t, std::unique_ptr<dbms_client>> clients_; std::map<size_t, std::unique_ptr<dbms_client>> clients_;
std::vector<std::thread> client_threads_; std::vector<boost::thread> client_threads_;
}; };
class dbms_client : public trrep::client_context class dbms_client : public trrep::client_context
@ -196,13 +190,10 @@ public:
, mutex_() , mutex_()
, server_(server) , server_(server)
, n_transactions_(n_transactions) , n_transactions_(n_transactions)
, result_()
{ } { }
~dbms_client() ~dbms_client() { }
{
std::cout << "Result: " << result_;
}
void start() void start()
{ {
for (size_t i(0); i < n_transactions_; ++i) for (size_t i(0); i < n_transactions_; ++i)
@ -257,12 +248,18 @@ private:
void run_one_transaction() void run_one_transaction()
{ {
trrep::transaction_context trx(*this); before_command();
trx.start_transaction(server_.next_transaction_id()); before_statement();
std::ostringstream os; start_transaction(server_.next_transaction_id());
after_statement();
after_command();
int err(0); int err(0);
for (int i(0); i < 1 && err == 0; ++i) for (int i(0); i < 1 && err == 0; ++i)
{ {
std::ostringstream os;
before_command();
before_statement();
int data(std::rand() % 10000000); int data(std::rand() % 10000000);
os << data; os << data;
trrep::key key; trrep::key key;
@ -270,24 +267,30 @@ private:
wsrep_conn_id_t client_id(id().get()); wsrep_conn_id_t client_id(id().get());
key.append_key_part(&client_id, sizeof(client_id)); key.append_key_part(&client_id, sizeof(client_id));
key.append_key_part(&data, sizeof(data)); key.append_key_part(&data, sizeof(data));
err = trx.append_key(key); err = append_key(key);
err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); err = err || append_data(trrep::data(os.str().c_str(), os.str().size()));
after_statement();
after_command();
} }
before_command();
before_statement();
// std::cout << "append_data: " << err << "\n"; // std::cout << "append_data: " << err << "\n";
if (do_2pc()) if (err == 0 && do_2pc())
{ {
err = err || trx.before_prepare(); err = err || before_prepare();
// std::cout << "before_prepare: " << err << "\n"; // std::cout << "before_prepare: " << err << "\n";
err = err || trx.after_prepare(); err = err || after_prepare();
// std::cout << "after_prepare: " << err << "\n"; // std::cout << "after_prepare: " << err << "\n";
} }
err = err || trx.before_commit(); err = err || before_commit();
// std::cout << "before_commit: " << err << "\n"; // std::cout << "before_commit: " << err << "\n";
err = err || trx.ordered_commit(); err = err || ordered_commit();
// std::cout << "ordered_commit: " << err << "\n"; // std::cout << "ordered_commit: " << err << "\n";
err = err || trx.after_commit(); err = err || after_commit();
// std::cout << "after_commit: " << err << "\n"; // std::cout << "after_commit: " << err << "\n";
trx.after_statement(); after_statement();
after_command();
} }
void report_progress(size_t i) const void report_progress(size_t i) const
@ -303,7 +306,6 @@ private:
trrep::default_mutex mutex_; trrep::default_mutex mutex_;
dbms_server& server_; dbms_server& server_;
const size_t n_transactions_; const size_t n_transactions_;
size_t result_;
}; };
@ -353,7 +355,7 @@ void dbms_server::start_client(size_t id)
trrep::client_context::m_replicating, trrep::client_context::m_replicating,
simulator_.params().n_transactions)); simulator_.params().n_transactions));
client_threads_.push_back( client_threads_.push_back(
std::thread(&dbms_server::client_thread, this, client)); boost::thread(&dbms_server::client_thread, this, client));
} }

View File

@ -25,6 +25,13 @@ namespace trrep
, do_2pc_(do_2pc) , do_2pc_(do_2pc)
, fail_next_applying_() , fail_next_applying_()
{ } { }
~mock_client_context()
{
if (transaction().active())
{
(void)rollback(transaction());
}
}
int apply(trrep::transaction_context&, const trrep::data&); int apply(trrep::transaction_context&, const trrep::data&);
int commit(trrep::transaction_context&); int commit(trrep::transaction_context&);
int rollback(trrep::transaction_context&); int rollback(trrep::transaction_context&);

View File

@ -25,7 +25,7 @@ void trrep_mock::bf_abort_provider(trrep::mock_server_context& sc,
(void)victim_seqno; (void)victim_seqno;
} }
trrep::transaction_context trrep_mock::applying_transaction( trrep::transaction_context& trrep_mock::start_applying_transaction(
trrep::client_context& cc, trrep::client_context& cc,
const trrep::transaction_id& id, const trrep::transaction_id& id,
wsrep_seqno_t seqno, wsrep_seqno_t seqno,
@ -37,6 +37,10 @@ trrep::transaction_context trrep_mock::applying_transaction(
{ { static_cast<uint8_t>(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */ { { static_cast<uint8_t>(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */
seqno - 1 seqno - 1
}; };
trrep::transaction_context ret(cc, ws_handle, meta, flags); int ret(cc.start_transaction(ws_handle, meta, flags));
return ret; if (ret != 0)
{
throw trrep::runtime_error("failed to start applying transaction");
}
return cc.transaction();
} }

View File

@ -28,7 +28,7 @@ namespace trrep_mock
const trrep::transaction_context& tc, const trrep::transaction_context& tc,
wsrep_seqno_t bf_seqno); wsrep_seqno_t bf_seqno);
trrep::transaction_context applying_transaction( trrep::transaction_context& start_applying_transaction(
trrep::client_context& cc, trrep::client_context& cc,
const trrep::transaction_id& id, const trrep::transaction_id& id,
wsrep_seqno_t seqno, wsrep_seqno_t seqno,

View File

@ -113,12 +113,13 @@ namespace
assert(client_context->mode() == trrep::client_context::m_applier); assert(client_context->mode() == trrep::client_context::m_applier);
trrep::data data(buf->ptr, buf->len); trrep::data data(buf->ptr, buf->len);
trrep::transaction_context transaction_context(*client_context, if (client_context->start_transaction(*wsh, *meta, flags))
*wsh, {
*meta, ret = WSREP_CB_FAILURE;
flags); }
if (client_context->server_context().on_apply( if (ret == WSREP_CB_SUCCESS &&
*client_context, transaction_context, data)) client_context->server_context().on_apply(
*client_context, client_context->transaction(), data))
{ {
ret = WSREP_CB_FAILURE; ret = WSREP_CB_FAILURE;
} }
@ -294,11 +295,13 @@ int trrep::server_context::on_apply(
assert(0); assert(0);
} }
transaction_context.after_statement();
if (ret) if (ret)
{ {
client_context.rollback(transaction_context); client_context.rollback(transaction_context);
} }
transaction_context.after_statement();
assert(transaction_context.active() == false);
return ret; return ret;
} }

View File

@ -21,7 +21,11 @@
* State Snapshot Transfer * State Snapshot Transfer
* ----------------------- * -----------------------
* *
* TODO * Depending on SST type (physical or logical), the server storage
* engine initialization must be done before or after SST happens.
* In case of physical SST method (typically rsync, filesystem snapshot)
* the SST happens before the storage engine is initialized, in case
* of logical backup typically after the storage engine initialization.
* *
* Rollback Mode * Rollback Mode
* ------------- * -------------
@ -52,14 +56,6 @@
* to be aborted or in case of fully optimistic concurrency control, * to be aborted or in case of fully optimistic concurrency control,
* the conflict is detected at commit. * the conflict is detected at commit.
* *
* SST
* ===
*
* Depending on SST type (physical or logical), the server storage
* engine initialization must be done before or after SST happens.
* In case of physical SST method (typically rsync, filesystem snapshot)
* the SST happens before the storage engine is initialized, in case
* of logical backup typically after the storage engine initialization.
*/ */
#ifndef TRREP_SERVER_CONTEXT_HPP #ifndef TRREP_SERVER_CONTEXT_HPP
@ -135,7 +131,9 @@ namespace trrep
/*! Server is donating state snapshot transfer */ /*! Server is donating state snapshot transfer */
s_donor, s_donor,
/*! Server has synced with the cluster */ /*! Server has synced with the cluster */
s_synced s_synced,
/*! Server is disconnecting from group */
s_disconnecting
}; };
/*! /*!
* Rollback Mode enumeration * Rollback Mode enumeration
@ -252,6 +250,10 @@ namespace trrep
* Wait until server reaches given state. * Wait until server reaches given state.
* *
* \todo Waiting for transitional states may not be reliable. * \todo Waiting for transitional states may not be reliable.
* Should introduce array of waiter counts per state,
* on state change the state changing thread is
* not allowed to proceed until all waiters have
* woken up.
*/ */
void wait_until_state(trrep::server_context::state) const; void wait_until_state(trrep::server_context::state) const;
@ -261,6 +263,7 @@ namespace trrep
* engine initialization, false otherwise. * engine initialization, false otherwise.
*/ */
virtual bool sst_before_init() const = 0; virtual bool sst_before_init() const = 0;
/*! /*!
* Virtual method which will be called on *joiner* when the provider * Virtual method which will be called on *joiner* when the provider
* requests the SST request information. This method should * requests the SST request information. This method should
@ -329,6 +332,7 @@ namespace trrep
virtual bool statement_allowed_for_streaming( virtual bool statement_allowed_for_streaming(
const trrep::client_context& client_context, const trrep::client_context& client_context,
const trrep::transaction_context& transaction_context) const; const trrep::transaction_context& transaction_context) const;
protected: protected:
/*! Server Context constructor /*! Server Context constructor
* *

View File

@ -16,8 +16,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc)
trrep::client_id(1), trrep::client_id(1),
trrep::client_context::m_applier, trrep::client_context::m_applier,
false); false);
trrep::transaction_context tc( trrep::transaction_context& tc(
trrep_mock::applying_transaction( trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
char buf[1] = { 1 }; char buf[1] = { 1 };
@ -34,8 +34,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc)
trrep::client_id(1), trrep::client_id(1),
trrep::client_context::m_applier, trrep::client_context::m_applier,
true); true);
trrep::transaction_context tc( trrep::transaction_context& tc(
trrep_mock::applying_transaction( trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
char buf[1] = { 1 }; char buf[1] = { 1 };
@ -54,11 +54,12 @@ BOOST_AUTO_TEST_CASE(server_context_applying_1pc_rollback)
trrep::client_context::m_applier, trrep::client_context::m_applier,
false); false);
cc.fail_next_applying(true); cc.fail_next_applying(true);
trrep::transaction_context tc( trrep::transaction_context& tc(
trrep_mock::applying_transaction( trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, tc, trrep::data(buf, 1)) == 1); BOOST_REQUIRE(sc.on_apply(cc, tc, trrep::data(buf, 1)) == 1);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
} }
@ -74,8 +75,8 @@ BOOST_AUTO_TEST_CASE(server_context_applying_2pc_rollback)
trrep::client_context::m_applier, trrep::client_context::m_applier,
true); true);
cc.fail_next_applying(true); cc.fail_next_applying(true);
trrep::transaction_context tc( trrep::transaction_context& tc(
trrep_mock::applying_transaction( trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
char buf[1] = { 1 }; char buf[1] = { 1 };

View File

@ -7,6 +7,7 @@
#include "server_context.hpp" #include "server_context.hpp"
#include "key.hpp" #include "key.hpp"
#include "data.hpp" #include "data.hpp"
#include "compiler.hpp"
#include <iostream> // TODO: replace with proper logging utility #include <iostream> // TODO: replace with proper logging utility
#include <sstream> #include <sstream>
@ -30,40 +31,9 @@ trrep::transaction_context::transaction_context(
, rollback_replicated_for_(false) , rollback_replicated_for_(false)
{ } { }
trrep::transaction_context::transaction_context(
trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags)
: provider_(client_context.provider())
, client_context_(client_context)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, ws_handle_(ws_handle)
, trx_meta_(trx_meta)
, flags_(flags)
, pa_unsafe_()
, certified_(true)
, fragments_()
, rollback_replicated_for_(false)
{ }
trrep::transaction_context::~transaction_context() trrep::transaction_context::~transaction_context()
{ { }
if (active())
{
try
{
(void)client_context_.rollback(*this);
}
catch (...)
{
// TODO: Log warning
}
}
}
int trrep::transaction_context::start_transaction( int trrep::transaction_context::start_transaction(
const trrep::transaction_id& id) const trrep::transaction_id& id)
@ -86,6 +56,21 @@ int trrep::transaction_context::start_transaction(
} }
} }
int trrep::transaction_context::start_transaction(
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags)
{
assert(active() == false);
assert(client_context_.mode() == trrep::client_context::m_applier);
state_ = s_executing;
ws_handle_ = ws_handle;
trx_meta_ = trx_meta;
flags_ = flags;
certified_ = true;
return 0;
}
int trrep::transaction_context::append_key(const trrep::key& key) int trrep::transaction_context::append_key(const trrep::key& key)
{ {
@ -104,7 +89,7 @@ int trrep::transaction_context::before_prepare()
int ret(0); int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("before_prepare_enter");
assert(state() == s_executing || state() == s_must_abort); assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort) if (state() == s_must_abort)
@ -144,7 +129,7 @@ int trrep::transaction_context::before_prepare()
} }
assert(state() == s_preparing); assert(state() == s_preparing);
debug_log_state(); debug_log_state("before_prepare_leave");
return ret; return ret;
} }
@ -152,7 +137,7 @@ int trrep::transaction_context::after_prepare()
{ {
int ret(1); int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("after_prepare_enter");
assert(state() == s_preparing || state() == s_must_abort); assert(state() == s_preparing || state() == s_must_abort);
if (state() == s_must_abort) if (state() == s_must_abort)
{ {
@ -184,7 +169,7 @@ int trrep::transaction_context::after_prepare()
ret = 0; ret = 0;
break; break;
} }
debug_log_state(); debug_log_state("after_prepare_leave");
return ret; return ret;
} }
@ -193,7 +178,7 @@ int trrep::transaction_context::before_commit()
int ret(1); int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("before_commit_enter");
assert(state() == s_executing || state() == s_committing || assert(state() == s_executing || state() == s_committing ||
state() == s_must_abort); state() == s_must_abort);
@ -264,7 +249,7 @@ int trrep::transaction_context::before_commit()
} }
break; break;
} }
debug_log_state(); debug_log_state("before_commit_leave");
return ret; return ret;
} }
@ -273,14 +258,14 @@ int trrep::transaction_context::ordered_commit()
int ret(1); int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("ordered_commit_enter");
assert(state() == s_committing); assert(state() == s_committing);
assert(ordered()); assert(ordered());
ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_); ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_);
// Should always succeed // Should always succeed
assert(ret == 0); assert(ret == 0);
state(lock, s_ordered_commit); state(lock, s_ordered_commit);
debug_log_state(); debug_log_state("ordered_commit_leave");
return ret; return ret;
} }
@ -289,7 +274,7 @@ int trrep::transaction_context::after_commit()
int ret(0); int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("after_commit_enter");
assert(state() == s_ordered_commit); assert(state() == s_ordered_commit);
switch (client_context_.mode()) switch (client_context_.mode())
@ -309,14 +294,14 @@ int trrep::transaction_context::after_commit()
} }
assert(ret == 0); assert(ret == 0);
state(lock, s_committed); state(lock, s_committed);
debug_log_state(); debug_log_state("after_commit_leave");
return ret; return ret;
} }
int trrep::transaction_context::before_rollback() int trrep::transaction_context::before_rollback()
{ {
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("before_rollback_enter");
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_must_abort || state() == s_must_abort ||
state() == s_cert_failed || state() == s_cert_failed ||
@ -362,14 +347,14 @@ int trrep::transaction_context::before_rollback()
assert(0); assert(0);
break; break;
} }
debug_log_state(); debug_log_state("before_rollback_leave");
return 0; return 0;
} }
int trrep::transaction_context::after_rollback() int trrep::transaction_context::after_rollback()
{ {
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("after_rollback_enter");
assert(state() == s_aborting || assert(state() == s_aborting ||
state() == s_must_replay); state() == s_must_replay);
@ -384,7 +369,7 @@ int trrep::transaction_context::after_rollback()
// during actual rollback. If the transaction has been ordered, // during actual rollback. If the transaction has been ordered,
// releasing the commit ordering critical section should be // releasing the commit ordering critical section should be
// also postponed until all resources have been released. // also postponed until all resources have been released.
debug_log_state(); debug_log_state("after_rollback_leave");
return 0; return 0;
} }
@ -392,7 +377,7 @@ int trrep::transaction_context::after_statement()
{ {
int ret(0); int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex()); trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state(); debug_log_state("after_statement_enter");
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_committed || state() == s_committed ||
state() == s_aborted || state() == s_aborted ||
@ -446,7 +431,7 @@ int trrep::transaction_context::after_statement()
cleanup(); cleanup();
} }
debug_log_state(); debug_log_state("after_statement_leave");
return ret; return ret;
} }
@ -458,7 +443,7 @@ void trrep::transaction_context::state(
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
static const char allowed[n_states][n_states] = static const char allowed[n_states][n_states] =
{ /* ex pr ce co oc ct cf ma ab ad mr re from/to */ { /* ex pr ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ { 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */
{ 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ { 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
@ -700,9 +685,7 @@ void trrep::transaction_context::clear_fragments()
void trrep::transaction_context::cleanup() void trrep::transaction_context::cleanup()
{ {
// std::cerr << "Cleanup transaction " debug_log_state("cleanup_enter");
// << client_context_.id().get()
// << ": " << id_.get() << "\n";
id_ = trrep::transaction_id::invalid(); id_ = trrep::transaction_id::invalid();
if (is_streaming()) if (is_streaming())
{ {
@ -715,11 +698,17 @@ void trrep::transaction_context::cleanup()
trx_meta_.stid.conn = trrep::client_id::invalid(); trx_meta_.stid.conn = trrep::client_id::invalid();
certified_ = false; certified_ = false;
pa_unsafe_ = false; pa_unsafe_ = false;
debug_log_state("cleanup_leave");
} }
void trrep::transaction_context::debug_log_state() const void trrep::transaction_context::debug_log_state(
const std::string& context TRREP_UNUSED)
const
{ {
// std::cout << "client: " << client_context_.id().get() #if 0
// << " trx: " << id_.get() std::cout << context
// << " state: " << state_ << "\n"; << ": client: " << client_context_.id().get()
<< " trx: " << int64_t(id_.get())
<< " state: " << trrep::to_string(state_) << "\n";
#endif /* 0 */
} }

View File

@ -6,6 +6,7 @@
#define TRREP_TRANSACTION_CONTEXT_HPP #define TRREP_TRANSACTION_CONTEXT_HPP
#include "provider.hpp" #include "provider.hpp"
#include "server_context.hpp"
#include "lock.hpp" #include "lock.hpp"
#include <wsrep_api.h> #include <wsrep_api.h>
@ -55,11 +56,10 @@ namespace trrep
s_replaying s_replaying
}; };
static const int n_states = s_replaying + 1; static const int n_states = s_replaying + 1;
enum state state() const
{ return state_; }
transaction_context(trrep::client_context& client_context); transaction_context(trrep::client_context& client_context);
transaction_context(trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags);
~transaction_context(); ~transaction_context();
// Accessors // Accessors
trrep::transaction_id id() const trrep::transaction_id id() const
@ -68,8 +68,6 @@ namespace trrep
bool active() const bool active() const
{ return (id_ != trrep::transaction_id::invalid()); } { return (id_ != trrep::transaction_id::invalid()); }
enum state state() const
{ return state_; }
void state(trrep::unique_lock<trrep::mutex>&, enum state); void state(trrep::unique_lock<trrep::mutex>&, enum state);
@ -99,6 +97,10 @@ namespace trrep
int start_transaction(const trrep::transaction_id& id); int start_transaction(const trrep::transaction_id& id);
int start_transaction(const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags);
int append_key(const trrep::key&); int append_key(const trrep::key&);
int append_data(const trrep::data&); int append_data(const trrep::data&);
@ -128,12 +130,15 @@ namespace trrep
return flags_; return flags_;
} }
private: private:
transaction_context(const transaction_context&);
transaction_context operator=(const transaction_context&);
int certify_fragment(trrep::unique_lock<trrep::mutex>&); int certify_fragment(trrep::unique_lock<trrep::mutex>&);
int certify_commit(trrep::unique_lock<trrep::mutex>&); int certify_commit(trrep::unique_lock<trrep::mutex>&);
void remove_fragments(); void remove_fragments();
void clear_fragments(); void clear_fragments();
void cleanup(); void cleanup();
void debug_log_state() const; void debug_log_state(const std::string&) const;
trrep::provider& provider_; trrep::provider& provider_;
trrep::client_context& client_context_; trrep::client_context& client_context_;
trrep::transaction_id id_; trrep::transaction_id id_;

View File

@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc)
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc,trrep::client_id(1), trrep::mock_client_context cc(sc,trrep::client_id(1),
trrep::client_context::m_replicating); trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc)
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -108,7 +108,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_rollback)
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc,trrep::client_id(1), trrep::mock_client_context cc(sc,trrep::client_id(1),
trrep::client_context::m_replicating); trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -144,7 +144,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -185,7 +185,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare)
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -226,7 +226,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare)
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -272,7 +272,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
trrep::mock_server_context sc("s1", "s1", trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync); trrep::server_context::rm_sync);
trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::mock_client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(cc); trrep::transaction_context& tc(cc.transaction());
// Verify initial state // Verify initial state
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
@ -355,7 +355,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying)
trrep::mock_client_context cc(sc, trrep::mock_client_context cc(sc,
trrep::client_id(1), trrep::client_id(1),
trrep::client_context::m_applier); trrep::client_context::m_applier);
trrep::transaction_context tc(trrep_mock::applying_transaction( trrep::transaction_context& tc(trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
@ -371,6 +371,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying)
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit);
BOOST_REQUIRE(tc.after_commit() == 0); BOOST_REQUIRE(tc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
BOOST_REQUIRE(tc.active() == false);
} }
BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying) BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying)
@ -380,7 +383,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying)
trrep::mock_client_context cc(sc, trrep::mock_client_context cc(sc,
trrep::client_id(1), trrep::client_id(1),
trrep::client_context::m_applier); trrep::client_context::m_applier);
trrep::transaction_context tc(trrep_mock::applying_transaction( trrep::transaction_context& tc(trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
@ -400,6 +403,9 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying)
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit);
BOOST_REQUIRE(tc.after_commit() == 0); BOOST_REQUIRE(tc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
BOOST_REQUIRE(tc.active() == false);
} }
BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback) BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback)
@ -409,7 +415,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback)
trrep::mock_client_context cc(sc, trrep::mock_client_context cc(sc,
trrep::client_id(1), trrep::client_id(1),
trrep::client_context::m_applier); trrep::client_context::m_applier);
trrep::transaction_context tc(trrep_mock::applying_transaction( trrep::transaction_context& tc(trrep_mock::start_applying_transaction(
cc, 1, 1, cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
@ -423,4 +429,8 @@ BOOST_AUTO_TEST_CASE(transaction_context_applying_rollback)
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborting);
BOOST_REQUIRE(tc.after_rollback() == 0); BOOST_REQUIRE(tc.after_rollback() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
BOOST_REQUIRE(tc.after_statement() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_aborted);
BOOST_REQUIRE(tc.active() == false);
} }