From a8be09161caa2b968c4c70e4d34ba509d9dd18f6 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sat, 7 Jul 2018 12:01:14 +0300 Subject: [PATCH] Replaced replicating mode with local. The intended purpose for local mode was local storage access without entering replication hooks. However, the same can be achieved with high priority mode. Removed replicating mode and use local instead to denote locally processing clients. --- dbsim/db_server.cpp | 4 ++-- dbsim/db_storage_engine.cpp | 2 +- include/wsrep/client_state.hpp | 17 +++++++---------- src/client_state.cpp | 19 +++++++++---------- src/transaction.cpp | 26 +++++++------------------- test/client_state_fixture.hpp | 14 +++++++------- 6 files changed, 33 insertions(+), 49 deletions(-) diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index b9cae18..15c677c 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -96,7 +96,7 @@ void db::server::start_client(size_t id) { auto client(std::make_shared( *this, id, - wsrep::client_state::m_replicating, + wsrep::client_state::m_local, simulator_.params())); clients_.push_back(client); client_threads_.push_back( @@ -115,7 +115,7 @@ wsrep::client_state* db::server::local_client_state() std::ostringstream id_os; size_t client_id(++last_client_id_); db::client* client(new db::client(*this, client_id, - wsrep::client_state::m_replicating, + wsrep::client_state::m_local, simulator_.params())); return &client->client_state(); } diff --git a/dbsim/db_storage_engine.cpp b/dbsim/db_storage_engine.cpp index 5ac9622..2fc400b 100644 --- a/dbsim/db_storage_engine.cpp +++ b/dbsim/db_storage_engine.cpp @@ -53,7 +53,7 @@ void db::storage_engine::bf_abort_some(const wsrep::transaction& txc) for (auto victim : transactions_) { wsrep::client_state& cc(victim->client_state()); - if (cc.mode() == wsrep::client_state::m_replicating) + if (cc.mode() == wsrep::client_state::m_local) { if (victim->bf_abort(txc.seqno())) { diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 58302fa..0792770 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -75,10 +75,8 @@ namespace wsrep */ enum mode { - /** Operates in local only mode, no replication. */ + /** Locally operating client session. */ m_local, - /** Generates write sets for replication by the provider. */ - m_replicating, /** High priority mode */ m_high_priority, /** Client is in total order isolation mode */ @@ -262,7 +260,7 @@ namespace wsrep */ int append_key(const wsrep::key& key) { - assert(mode_ == m_replicating); + assert(mode_ == m_local); assert(state_ == s_exec); return transaction_.append_key(key); } @@ -272,7 +270,7 @@ namespace wsrep */ int append_data(const wsrep::const_buffer& data) { - assert(mode_ == m_replicating); + assert(mode_ == m_local); assert(state_ == s_exec); return transaction_.append_data(data); } @@ -286,7 +284,7 @@ namespace wsrep */ int after_row() { - assert(mode_ == m_replicating); + assert(mode_ == m_local); assert(state_ == s_exec); return (transaction_.streaming_context_.fragment_size() ? transaction_.after_row() : 0); @@ -377,7 +375,7 @@ namespace wsrep */ void sync_rollback_complete() { - assert(state_ == s_idle && mode_ == m_replicating && + assert(state_ == s_idle && mode_ == m_local && transaction_.state() == wsrep::transaction::s_aborted); cond_.notify_all(); } @@ -389,7 +387,7 @@ namespace wsrep int bf_abort(wsrep::seqno bf_seqno) { wsrep::unique_lock lock(mutex_); - assert(mode_ == m_replicating); + assert(mode_ == m_local); return transaction_.bf_abort(lock, bf_seqno); } @@ -442,7 +440,7 @@ namespace wsrep /** * Return the mode where client entered into TOI mode. - * The return value can be either m_replicating or + * The return value can be either m_local or * m_high_priority. */ enum mode toi_mode() const @@ -724,7 +722,6 @@ namespace wsrep switch (mode) { case wsrep::client_state::m_local: return "local"; - case wsrep::client_state::m_replicating: return "replicating"; case wsrep::client_state::m_high_priority: return "high priority"; case wsrep::client_state::m_toi: return "toi"; case wsrep::client_state::m_rsu: return "rsu"; diff --git a/src/client_state.cpp b/src/client_state.cpp index ccd3727..ebde98c 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -202,7 +202,7 @@ int wsrep::client_state::after_statement() (void)transaction_.after_statement(); if (current_error() == wsrep::e_deadlock_error) { - if (mode_ == m_replicating) + if (mode_ == m_local) { debug_log_state("after_statement: may_retry"); return 1; @@ -222,7 +222,7 @@ int wsrep::client_state::enable_streaming( fragment_unit, size_t fragment_size) { - assert(mode_ == m_replicating); + assert(mode_ == m_local); if (transaction_.active() && transaction_.streaming_context_.fragment_unit() != fragment_unit) @@ -242,7 +242,7 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys, int flags) { assert(state_ == s_exec); - assert(mode_ == m_replicating); + assert(mode_ == m_local); int ret; switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags)) { @@ -276,7 +276,7 @@ int wsrep::client_state::enter_toi(const wsrep::ws_meta& ws_meta) int wsrep::client_state::leave_toi() { int ret; - if (toi_mode_ == m_replicating) + if (toi_mode_ == m_local) { switch (provider().leave_toi(id_)) { @@ -431,12 +431,11 @@ void wsrep::client_state::mode( { assert(lock.owns_lock()); static const char allowed[n_modes_][n_modes_] = - { /* l r h t r */ - { 0, 0, 0, 0, 0 }, /* local */ - { 0, 0, 1, 1, 1 }, /* repl */ - { 0, 1, 0, 1, 0 }, /* high prio */ - { 0, 1, 1, 0, 0 }, /* toi */ - { 0, 1, 0, 0, 0 } /* rsu */ + { /* l h t r */ + { 0, 1, 1, 1 }, /* local */ + { 1, 0, 1, 0 }, /* high prio */ + { 1, 1, 0, 0 }, /* toi */ + { 1, 0, 0, 0 } /* rsu */ }; if (allowed[mode_][mode]) { diff --git a/src/transaction.cpp b/src/transaction.cpp index 5a244e0..7e1d341 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -58,11 +58,10 @@ int wsrep::transaction::start_transaction( flags_ |= wsrep::provider::flag::start_transaction; switch (client_state_.mode()) { - case wsrep::client_state::m_local: case wsrep::client_state::m_high_priority: debug_log_state("start_transaction success"); return 0; - case wsrep::client_state::m_replicating: + case wsrep::client_state::m_local: debug_log_state("start_transaction success"); return provider().start_transaction(ws_handle_); default: @@ -162,7 +161,7 @@ int wsrep::transaction::before_prepare( if (state() == s_must_abort) { - assert(client_state_.mode() == wsrep::client_state::m_replicating); + assert(client_state_.mode() == wsrep::client_state::m_local); client_state_.override_error(wsrep::e_deadlock_error); return 1; } @@ -171,7 +170,7 @@ int wsrep::transaction::before_prepare( switch (client_state_.mode()) { - case wsrep::client_state::m_replicating: + case wsrep::client_state::m_local: if (is_streaming()) { client_service_.debug_crash( @@ -193,7 +192,6 @@ int wsrep::transaction::before_prepare( "crash_last_fragment_commit_after_fragment_removal"); } break; - case wsrep::client_state::m_local: case wsrep::client_state::m_high_priority: if (is_streaming()) { @@ -219,21 +217,20 @@ int wsrep::transaction::after_prepare( assert(state() == s_preparing || state() == s_must_abort); if (state() == s_must_abort) { - assert(client_state_.mode() == wsrep::client_state::m_replicating); + assert(client_state_.mode() == wsrep::client_state::m_local); client_state_.override_error(wsrep::e_deadlock_error); return 1; } switch (client_state_.mode()) { - case wsrep::client_state::m_replicating: + case wsrep::client_state::m_local: ret = certify_commit(lock); assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || state() == s_cert_failed)); break; - case wsrep::client_state::m_local: case wsrep::client_state::m_high_priority: state(lock, s_certifying); state(lock, s_committing); @@ -264,12 +261,6 @@ int wsrep::transaction::before_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (ordered()) - { - ret = provider().commit_order_enter(ws_handle_, ws_meta_); - } - break; - case wsrep::client_state::m_replicating: if (state() == s_executing) { assert(client_service_.do_2pc() == false); @@ -380,7 +371,7 @@ int wsrep::transaction::after_commit() if (is_streaming()) { - assert(client_state_.mode() == wsrep::client_state::m_replicating || + assert(client_state_.mode() == wsrep::client_state::m_local || client_state_.mode() == wsrep::client_state::m_high_priority); clear_fragments(); } @@ -388,9 +379,6 @@ int wsrep::transaction::after_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - // Nothing to do - break; - case wsrep::client_state::m_replicating: ret = provider().release(ws_handle_); break; case wsrep::client_state::m_high_priority: @@ -735,7 +723,7 @@ int wsrep::transaction::certify_fragment( { assert(lock.owns_lock()); - assert(client_state_.mode() == wsrep::client_state::m_replicating); + assert(client_state_.mode() == wsrep::client_state::m_local); assert(streaming_context_.rolled_back() == false); client_service_.wait_for_replayers(lock); diff --git a/test/client_state_fixture.hpp b/test/client_state_fixture.hpp index 881f2b0..95c65f2 100644 --- a/test/client_state_fixture.hpp +++ b/test/client_state_fixture.hpp @@ -18,7 +18,7 @@ namespace replicating_client_fixture_sync_rm() : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -38,7 +38,7 @@ namespace replicating_client_fixture_async_rm() : sc("s1", "s1", wsrep::server_state::rm_async) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -58,7 +58,7 @@ namespace replicating_client_fixture_2pc() : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -79,7 +79,7 @@ namespace replicating_client_fixture_autocommit() : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -160,7 +160,7 @@ namespace : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -182,7 +182,7 @@ namespace : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id()); @@ -204,7 +204,7 @@ namespace : sc("s1", "s1", wsrep::server_state::rm_sync) , cc(sc, wsrep::client_id(1), - wsrep::client_state::m_replicating) + wsrep::client_state::m_local) , tc(cc.transaction()) { cc.open(cc.id());