diff --git a/CMakeLists.txt b/CMakeLists.txt index d5988f8..baf9004 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ include(CheckIncludeFile) include(CTest) # Options -option(WITH_AUTO_TEST "Run unit tests automatically after build" ON) +option(WITH_AUTO_TEST "Run unit tests automatically after build" OFF) option(WITH_ASAN "Enable address sanitizer" OFF) option(WITH_TSAN "Enable thread sanitizer" OFF) diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index e9ac3a4..a40d4b0 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -23,11 +23,14 @@ db::client::client(db::server& server, void db::client::start() { + client_state_.open(client_state_.id()); for (size_t i(0); i < params_.n_transactions; ++i) { run_one_transaction(); report_progress(i + 1); } + client_state_.close(); + client_state_.cleanup(); } bool db::client::bf_abort(wsrep::seqno seqno) diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index 3979ef7..d220b4e 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -34,6 +34,7 @@ int db::client_service::rollback() assert(ret == 0); return ret; } + enum wsrep::provider::status db::client_service::replay() { diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 8fd1b5a..002f253 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -35,6 +35,7 @@ void db::server::applier_thread() simulator_.params()); wsrep::client_state* cc(static_cast( &applier.client_state())); + cc->open(cc->id()); enum wsrep::provider::status ret( server_state_.provider().run_applier(cc)); wsrep::log_info() << "Applier thread exited with error code " << ret; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 3c219af..7b1d28f 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -33,7 +33,7 @@ namespace wsrep e_append_fragment_error }; - static inline std::string to_string(enum client_error error) + static inline const char* to_string(enum client_error error) { switch (error) { @@ -73,6 +73,10 @@ namespace wsrep */ enum state { + /** + * Client session has not been initialized yet. + */ + s_none, /** * Client is idle, the control is in the application which * uses the DBMS system. @@ -102,7 +106,7 @@ namespace wsrep */ void store_globals() { - thread_id_ = wsrep::this_thread::get_id(); + current_thread_id_ = wsrep::this_thread::get_id(); } /** @@ -112,6 +116,33 @@ namespace wsrep { assert(transaction_.active() == false); } + + /** @name Client session handling */ + /** @{ */ + /** + * This method should be called when opening the client session. + * + * Initializes client id and changes the state to s_idle. + */ + void open(wsrep::client_id); + + /** + * This method should be called before closing the client session. + * + * The state is changed to s_quitting and any open transactions + * are rolled back. + */ + void close(); + + /** + * This method should be called after closing the client session + * to clean up. + * + * The state is changed to s_none. + */ + void cleanup(); + /** @} */ + /** @name Client command handling */ /** @{ */ /** @@ -306,13 +337,19 @@ namespace wsrep /** @} */ int before_rollback() { - assert(state_ == s_idle || state_ == s_exec || state_ == s_result); + assert(state_ == s_idle || + state_ == s_exec || + state_ == s_result || + state_ == s_quitting); return transaction_.before_rollback(); } int after_rollback() { - assert(state_ == s_idle || state_ == s_exec || state_ == s_result); + assert(state_ == s_idle || + state_ == s_exec || + state_ == s_result || + state_ == s_quitting); return transaction_.after_rollback(); } /** @} */ @@ -513,13 +550,14 @@ namespace wsrep wsrep::client_service& client_service, const client_id& id, enum mode mode) - : thread_id_(wsrep::this_thread::get_id()) + : owning_thread_id_(wsrep::this_thread::get_id()) + , current_thread_id_(owning_thread_id_) , mutex_(mutex) , server_state_(server_state) , client_service_(client_service) , id_(id) , mode_(mode) - , state_(s_idle) + , state_(s_none) , transaction_(*this) , allow_dirty_reads_() , debug_log_level_(0) @@ -539,7 +577,8 @@ namespace wsrep void state(wsrep::unique_lock& lock, enum state state); void override_error(enum wsrep::client_error error); - wsrep::thread::id thread_id_; + wsrep::thread::id owning_thread_id_; + wsrep::thread::id current_thread_id_; wsrep::mutex& mutex_; wsrep::server_state& server_state_; wsrep::client_service& client_service_; @@ -552,6 +591,18 @@ namespace wsrep wsrep::client_error current_error_; }; + static inline const char* to_string(enum wsrep::client_state::state state) + { + switch (state) + { + case wsrep::client_state::s_none: return "none"; + case wsrep::client_state::s_idle: return "idle"; + case wsrep::client_state::s_exec: return "exec"; + case wsrep::client_state::s_result: return "result"; + case wsrep::client_state::s_quitting: return "quit"; + } + return "unknown"; + } class client_state_switch { public: diff --git a/src/client_state.cpp b/src/client_state.cpp index 29d0bb3..c88517a 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -15,9 +15,35 @@ wsrep::provider& wsrep::client_state::provider() const return server_state_.provider(); } +void wsrep::client_state::open(wsrep::client_id id) +{ + wsrep::unique_lock lock(mutex_); + owning_thread_id_ = wsrep::this_thread::get_id(); + current_thread_id_ = owning_thread_id_; + state(lock, s_idle); + id_ = id; +} + +void wsrep::client_state::close() +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_quitting); + lock.unlock(); + if (transaction_.active()) + { + client_service_.rollback(); + } +} + +void wsrep::client_state::cleanup() +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_none); +} + void wsrep::client_state::override_error(enum wsrep::client_error error) { - assert(wsrep::this_thread::get_id() == thread_id_); + assert(wsrep::this_thread::get_id() == owning_thread_id_); if (current_error_ != wsrep::e_success && error == wsrep::e_success) { @@ -222,15 +248,16 @@ void wsrep::client_state::state( wsrep::unique_lock& lock WSREP_UNUSED, enum wsrep::client_state::state state) { - assert(wsrep::this_thread::get_id() == thread_id_); + assert(wsrep::this_thread::get_id() == owning_thread_id_); assert(lock.owns_lock()); static const char allowed[state_max_][state_max_] = { - /* idle exec result quit */ - { 0, 1, 0, 1}, /* idle */ - { 0, 0, 1, 0}, /* exec */ - { 1, 0, 0, 1}, /* result */ - { 0, 0, 0, 0} /* quit */ + /* none idle exec result quit */ + { 0, 1, 0, 0, 0}, /* none */ + { 0, 0, 1, 0, 1}, /* idle */ + { 0, 0, 0, 1, 0}, /* exec */ + { 0, 1, 0, 0, 0}, /* result */ + { 1, 0, 0, 0, 0} /* quit */ }; if (allowed[state_][state]) { diff --git a/test/client_state_fixture.hpp b/test/client_state_fixture.hpp index 1a0cd71..881f2b0 100644 --- a/test/client_state_fixture.hpp +++ b/test/client_state_fixture.hpp @@ -21,6 +21,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -40,6 +41,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -59,6 +61,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); cc.do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -79,6 +82,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); cc.is_autocommit_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -101,6 +105,7 @@ namespace wsrep::client_state::m_high_priority) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); wsrep::ws_handle ws_handle(1, (void*)1); @@ -129,6 +134,7 @@ namespace wsrep::client_state::m_high_priority) , tc(cc.transaction()) { + cc.open(cc.id()); cc.do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -157,6 +163,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -178,6 +185,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -199,6 +207,7 @@ namespace wsrep::client_state::m_replicating) , tc(cc.transaction()) { + cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index b3884c8..6943153 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -32,15 +32,20 @@ namespace wsrep { return provider_; } wsrep::client_state* local_client_state() { - return new wsrep::mock_client( - *this, ++last_client_id_, - wsrep::client_state::m_local); + wsrep::client_state* ret(new wsrep::mock_client( + *this, ++last_client_id_, + wsrep::client_state::m_local)); + ret->open(ret->id()); + return ret; } wsrep::client_state* streaming_applier_client_state() { - return new wsrep::mock_client( - *this, ++last_client_id_, - wsrep::client_state::m_high_priority); + wsrep::client_state* ret( + new wsrep::mock_client( + *this, ++last_client_id_, + wsrep::client_state::m_high_priority)); + ret->open(ret->id()); + return ret; } void release_client_state(wsrep::client_state* client_state) diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 5d017b7..7c61ebf 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -23,6 +23,7 @@ namespace wsrep::provider::flag::start_transaction | wsrep::provider::flag::commit) { + cc.open(cc.id()); } wsrep::mock_server_state sc; wsrep::mock_client cc; @@ -94,6 +95,7 @@ BOOST_AUTO_TEST_CASE(server_state_streaming) wsrep::stid(wsrep::id("1"), 1, 1), wsrep::seqno(0), wsrep::provider::flag::start_transaction); + cc.open(cc.id()); BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, wsrep::const_buffer("1", 1)) == 0); BOOST_REQUIRE(sc.find_streaming_applier(