diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index d220b4e..fa4c4f4 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -5,7 +5,7 @@ #include "db_client_service.hpp" #include "db_client.hpp" -int db::client_service::apply(const wsrep::const_buffer&) +int db::client_service::apply_write_set(const wsrep::const_buffer&) { db::client* client(client_state_.client()); client->se_trx_.start(client); @@ -13,6 +13,11 @@ int db::client_service::apply(const wsrep::const_buffer&) return 0; } +int db::client_service::apply_toi(const wsrep::const_buffer&) +{ + return 0; +} + int db::client_service::commit(const wsrep::ws_handle&, const wsrep::ws_meta&) { diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index a53cfd9..08e848d 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -67,7 +67,9 @@ namespace db void remove_fragments() override { } - int apply(const wsrep::const_buffer&) override; + int apply_write_set(const wsrep::const_buffer&) override; + + int apply_toi(const wsrep::const_buffer&) override; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 4111fa2..e284564 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -81,8 +81,23 @@ namespace wsrep /** * Apply a write set. + * + * A write set applying happens always + * as a part of the transaction. The caller must start a + * new transaction before applying a write set and must + * either commit to make changes persistent or roll back. */ - virtual int apply(const wsrep::const_buffer&) = 0; + virtual int apply_write_set(const wsrep::const_buffer&) = 0; + + /** + * Apply a TOI operation. + * + * TOI operation is a standalone operation and should not + * be executed as a part of a transaction. + */ + virtual int apply_toi(const wsrep::const_buffer&) = 0; + + // // Interface to global server state // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 182408c..5ceb811 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -399,6 +399,7 @@ namespace wsrep /** * Enter total order isolation critical section. + * * @param key_array Array of keys * @param buffer Buffer containing the action to execute inside * total order isolation section @@ -409,7 +410,30 @@ namespace wsrep int enter_toi(const wsrep::key_array& key_array, const wsrep::const_buffer& buffer, int flags); + /** + * Enter applying total order critical section. + * + * @param ws_meta Write set meta data + */ + int enter_toi(const wsrep::ws_meta& ws_meta); + /** + * Return true if the client_state is under TOI operation. + */ + bool in_toi() const + { + return (toi_meta_.seqno().is_undefined() == false); + } + + /** + * Return the mode where client entered into TOI mode. + * The return value can be either m_replicating or + * m_high_priority. + */ + enum mode toi_mode() const + { + return toi_mode_; + } /** * Leave total order isolation critical section. */ @@ -556,6 +580,7 @@ namespace wsrep , client_service_(client_service) , id_(id) , mode_(mode) + , toi_mode_() , state_(s_none) , transaction_(*this) , toi_meta_() @@ -585,6 +610,7 @@ namespace wsrep wsrep::client_service& client_service_; wsrep::client_id id_; enum mode mode_; + enum mode toi_mode_; enum state state_; wsrep::transaction transaction_; wsrep::ws_meta toi_meta_; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 97b8e17..17b77ca 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -344,7 +344,20 @@ namespace wsrep return (flags & wsrep::provider::flag::rollback); } + static inline bool is_toi(int flags) + { + return (flags & wsrep::provider::flag::isolation); + } + static inline bool is_commutative(int flags) + { + return (flags & wsrep::provider::flag::commutative); + } + + static inline bool is_native(int flags) + { + return (flags & wsrep::provider::flag::native); + } } #endif // WSREP_PROVIDER_HPP diff --git a/src/client_state.cpp b/src/client_state.cpp index 88b21f2..1299729 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -242,12 +242,14 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys, int flags) { assert(state_ == s_exec); + assert(mode_ == m_replicating); int ret; switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags)) { case wsrep::provider::success: { wsrep::unique_lock lock(mutex_); + toi_mode_ = mode_; mode(lock, m_toi); ret = 0; break; @@ -260,22 +262,38 @@ int wsrep::client_state::enter_toi(const wsrep::key_array& keys, return ret; } +int wsrep::client_state::enter_toi(const wsrep::ws_meta& ws_meta) +{ + wsrep::unique_lock lock(mutex_); + assert(mode_ == m_high_priority); + toi_mode_ = mode_; + mode(lock, m_toi); + toi_meta_ = ws_meta; + return 0; +} + int wsrep::client_state::leave_toi() { int ret; - switch (provider().leave_toi(id_)) + if (toi_mode_ == m_replicating) { - case wsrep::provider::success: - ret = 0; - break; - default: - assert(0); - override_error(wsrep::e_error_during_commit); - ret = 1; - break; + switch (provider().leave_toi(id_)) + { + case wsrep::provider::success: + ret = 0; + break; + default: + assert(0); + override_error(wsrep::e_error_during_commit); + ret = 1; + break; + } } wsrep::unique_lock lock(mutex_); - mode(lock, m_replicating); + mode(lock, toi_mode_); + toi_mode_ = m_local; + toi_meta_ = wsrep::ws_meta(); + return ret; } // Private @@ -326,11 +344,11 @@ void wsrep::client_state::mode( { assert(lock.owns_lock()); static const char allowed[mode_max_][mode_max_] = - { - { 0, 0, 0, 0}, /* local */ - { 0, 0, 1, 1}, /* repl */ - { 0, 1, 0, 0}, /* high prio */ - { 0, 1, 0, 0} /* toi */ + { /* l r h t */ + { 0, 0, 0, 0 }, /* local */ + { 0, 0, 1, 1 }, /* repl */ + { 0, 1, 0, 1 }, /* high prio */ + { 0, 1, 1, 0 } /* toi */ }; if (allowed[mode_][mode]) { diff --git a/src/exception.cpp b/src/exception.cpp index e7ae9f8..2a7a3b6 100644 --- a/src/exception.cpp +++ b/src/exception.cpp @@ -4,4 +4,4 @@ #include "wsrep/exception.hpp" -bool wsrep::abort_on_exception(false); +bool wsrep::abort_on_exception(true); diff --git a/src/server_state.cpp b/src/server_state.cpp index 252bb7d..387cec3 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -15,6 +15,196 @@ namespace { + int apply_write_set(wsrep::server_state& server_state, + wsrep::client_state& client_state, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) + { + int ret(0); + const wsrep::transaction& txc(client_state.transaction()); + assert(client_state.mode() == wsrep::client_state::m_high_priority); + + bool not_replaying(txc.state() != + wsrep::transaction::s_replaying); + + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + if (not_replaying) + { + client_state.before_command(); + client_state.before_statement(); + assert(txc.active() == false); + client_state.start_transaction(ws_handle, ws_meta); + } + else + { + client_state.start_replaying(ws_meta); + } + + if (client_state.client_service().apply_write_set(data)) + { + ret = 1; + } + else if (client_state.client_service().commit(ws_handle, ws_meta)) + { + ret = 1; + } + + if (ret) + { + client_state.client_service().rollback(); + } + + if (not_replaying) + { + client_state.after_statement(); + client_state.after_command_before_result(); + client_state.after_command_after_result(); + } + assert(ret || + txc.state() == wsrep::transaction::s_committed); + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + assert(not_replaying); + assert(server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); + wsrep::client_state* sac( + server_state.server_service().streaming_applier_client_state()); + server_state.start_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id(), sac); + sac->start_transaction(ws_handle, ws_meta); + { + // TODO: Client context switch will be ultimately + // implemented by the application. Therefore need to + // introduce a virtual method call which takes + // both original and sac client contexts as argument + // and a functor which does the applying. + wsrep::client_state_switch sw(client_state, *sac); + sac->before_command(); + sac->before_statement(); + sac->client_service().apply_write_set(data); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + } + server_state.server_service().log_dummy_write_set(client_state, ws_meta); + } + else if (ws_meta.flags() == 0) + { + wsrep::client_state* sac( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sac == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + } + else + { + wsrep::client_state_switch(client_state, *sac); + sac->before_command(); + sac->before_statement(); + ret = sac->client_service().apply_write_set(data); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + } + server_state.server_service().log_dummy_write_set( + client_state, ws_meta); + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + if (not_replaying) + { + wsrep::client_state* sac( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + assert(sac); + if (sac == 0) + { + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + } + else + { + wsrep::client_state_switch(client_state, *sac); + sac->before_command(); + sac->before_statement(); + ret = sac->client_service().commit(ws_handle, ws_meta); + sac->after_statement(); + sac->after_command_before_result(); + sac->after_command_after_result(); + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_client_state(sac); + } + } + else + { + ret = client_state.start_replaying(ws_meta) || + client_state.client_service().apply_write_set( + wsrep::const_buffer()) || + client_state.client_service().commit(ws_handle, ws_meta); + } + } + else + { + // SR fragment applying not implemented yet + assert(0); + } + if (not_replaying) + { + assert(txc.active() == false); + } + return ret; + } + + int apply_toi(wsrep::server_service&, + wsrep::client_state& client_state, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) + { + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + // Regular toi + client_state.enter_toi(ws_meta); + int ret(client_state.client_service().apply_toi(data)); + client_state.leave_toi(); + return ret; + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + // NBO begin + throw wsrep::not_implemented_error(); + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + // NBO end + throw wsrep::not_implemented_error(); + } + else + { + assert(0); + return 0; + } + } } @@ -315,154 +505,21 @@ int wsrep::server_state::on_apply( const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) { - int ret(0); - const wsrep::transaction& txc(client_state.transaction()); - assert(client_state.mode() == wsrep::client_state::m_high_priority); - - bool not_replaying(txc.state() != - wsrep::transaction::s_replaying); - - if (starts_transaction(ws_meta.flags()) && - commits_transaction(ws_meta.flags())) + if (is_toi(ws_meta.flags())) { - if (not_replaying) - { - client_state.before_command(); - client_state.before_statement(); - assert(txc.active() == false); - client_state.start_transaction(ws_handle, ws_meta); - } - else - { - client_state.start_replaying(ws_meta); - } - - if (client_state.client_service().apply(data)) - { - ret = 1; - } - else if (client_state.client_service().commit(ws_handle, ws_meta)) - { - ret = 1; - } - - if (ret) - { - client_state.client_service().rollback(); - } - - if (not_replaying) - { - client_state.after_statement(); - client_state.after_command_before_result(); - client_state.after_command_after_result(); - } - assert(ret || - txc.state() == wsrep::transaction::s_committed); + return apply_toi(server_service_, client_state, ws_meta, data); } - else if (starts_transaction(ws_meta.flags())) + else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags())) { - assert(not_replaying); - assert(find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()) == 0); - wsrep::client_state* sac(server_service_.streaming_applier_client_state()); - start_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id(), sac); - sac->start_transaction(ws_handle, ws_meta); - { - // TODO: Client context switch will be ultimately - // implemented by the application. Therefore need to - // introduce a virtual method call which takes - // both original and sac client contexts as argument - // and a functor which does the applying. - wsrep::client_state_switch sw(client_state, *sac); - sac->before_command(); - sac->before_statement(); - sac->client_service().apply(data); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); - } - server_service_.log_dummy_write_set(client_state, ws_meta); - } - else if (ws_meta.flags() == 0) - { - wsrep::client_state* sac( - find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id())); - if (sac == 0) - { - // It is possible that rapid group membership changes - // may cause streaming transaction be rolled back before - // commit fragment comes in. Although this is a valid - // situation, log a warning if a sac cannot be found as - // it may be an indication of a bug too. - wsrep::log_warning() << "Could not find applier context for " - << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); - } - else - { - wsrep::client_state_switch(client_state, *sac); - sac->before_command(); - sac->before_statement(); - ret = sac->client_service().apply(data); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); - } - server_service_.log_dummy_write_set(client_state, ws_meta); - } - else if (commits_transaction(ws_meta.flags())) - { - if (not_replaying) - { - wsrep::client_state* sac( - find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id())); - assert(sac); - if (sac == 0) - { - // It is possible that rapid group membership changes - // may cause streaming transaction be rolled back before - // commit fragment comes in. Although this is a valid - // situation, log a warning if a sac cannot be found as - // it may be an indication of a bug too. - wsrep::log_warning() << "Could not find applier context for " - << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); - } - else - { - wsrep::client_state_switch(client_state, *sac); - sac->before_command(); - sac->before_statement(); - ret = sac->client_service().commit(ws_handle, ws_meta); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); - stop_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()); - server_service_.release_client_state(sac); - } - } - else - { - ret = client_state.start_replaying(ws_meta) || - client_state.client_service().apply(wsrep::const_buffer()) || - client_state.client_service().commit(ws_handle, ws_meta); - } + // Not implemented yet. + assert(0); + return 0; } else { - // SR fragment applying not implemented yet - assert(0); + return apply_write_set(*this, client_state, + ws_handle, ws_meta, data); } - if (not_replaying) - { - assert(txc.active() == false); - } - return ret; } void wsrep::server_state::start_streaming_applier( diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index b2ef338..d1ef49e 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -357,8 +357,6 @@ namespace const wsrep_trx_meta_t* meta, wsrep_bool_t* exit_loop __attribute__((unused))) { - wsrep_cb_status_t ret(WSREP_CB_SUCCESS); - wsrep::client_state* client_state( reinterpret_cast(ctx)); assert(client_state); @@ -375,13 +373,22 @@ namespace meta->stid.trx, meta->stid.conn), wsrep::seqno(seqno_from_native(meta->depends_on)), map_flags_from_native(flags)); - if (ret == WSREP_CB_SUCCESS && - client_state->server_state().on_apply( - *client_state, ws_handle, ws_meta, data)) + try { - ret = WSREP_CB_FAILURE; + if (client_state->server_state().on_apply( + *client_state, ws_handle, ws_meta, data)) + { + return WSREP_CB_FAILURE; + } + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Caught runtime error while applying " + << ws_meta.flags() << ": " + << e.what(); + return WSREP_CB_FAILURE; } - return ret; } wsrep_cb_status_t synced_cb(void* app_ctx) diff --git a/test/mock_client_state.cpp b/test/mock_client_state.cpp index 5811c4b..93ca4ad 100644 --- a/test/mock_client_state.cpp +++ b/test/mock_client_state.cpp @@ -6,15 +6,23 @@ #include "mock_client_state.hpp" -int wsrep::mock_client_service::apply( +int wsrep::mock_client_service::apply_write_set( const wsrep::const_buffer&) { + assert(client_state_.toi_meta().seqno().is_undefined()); assert(client_state_.transaction().state() == wsrep::transaction::s_executing || client_state_.transaction().state() == wsrep::transaction::s_replaying); return (fail_next_applying_ ? 1 : 0); } +int wsrep::mock_client_service::apply_toi(const wsrep::const_buffer&) +{ + assert(client_state_.transaction().active() == false); + assert(client_state_.toi_meta().seqno().is_undefined() == false); + return (fail_next_toi_ ? 1 : 0); +} + int wsrep::mock_client_service::commit( const wsrep::ws_handle&, const wsrep::ws_meta&) { diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 0265308..bbd57e4 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -47,6 +47,7 @@ namespace wsrep , is_autocommit_() , do_2pc_() , fail_next_applying_() + , fail_next_toi_() , bf_abort_during_wait_() , error_during_prepare_data_() , killed_before_certify_() @@ -58,7 +59,9 @@ namespace wsrep , aborts_() { } - int apply(const wsrep::const_buffer&) WSREP_OVERRIDE; + int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE; + + int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; @@ -169,6 +172,7 @@ namespace wsrep bool is_autocommit_; bool do_2pc_; bool fail_next_applying_; + bool fail_next_toi_; bool bf_abort_during_wait_; bool error_during_prepare_data_; bool killed_before_certify_;