diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 5f2282e..2944910 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -667,21 +667,25 @@ namespace wsrep /** @{*/ /** - * Enter total order isolation critical section. + * Enter total order isolation critical section. If the wait_until + * is given non-default value, the operation is retried until + * successful, the given time point is reached or the client is + * interupted. * * @param key_array Array of keys * @param buffer Buffer containing the action to execute inside * total order isolation section * @param flags Provider flags for TOI operation - * @todo Flags argument is not necessary for TOI operation, - * they should always have start_transaction | commit - * when passed to provider. + * @param wait_until Time point to wait until for successful + * certification. * * @return Zero on success, non-zero otherwise. */ - int enter_toi_local(const wsrep::key_array& key_array, - const wsrep::const_buffer& buffer, - int flags); + int enter_toi_local( + const wsrep::key_array& key_array, + const wsrep::const_buffer& buffer, + std::chrono::time_point + wait_until = std::chrono::time_point()); /** * Enter applier TOI mode * @@ -743,13 +747,20 @@ namespace wsrep * the provider, the provider error code can be retrieved * by current_error_status() call. * + * If the wait_until is given non-default value, the operation is + * retried until successful, the given time point is reached or the + * client is interupted. + * * @param keys Array of keys for NBO operation. * @param buffer NBO write set - * + * @param wait_until Time point to wait until for successful certification. * @return Zero in case of success, non-zero in case of failure. */ - int begin_nbo_phase_one(const wsrep::key_array& keys, - const wsrep::const_buffer& buffer); + int begin_nbo_phase_one( + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + std::chrono::time_point + wait_until = std::chrono::time_point()); /** * End non-blocking operation phase after aquiring required @@ -996,7 +1007,7 @@ namespace wsrep enum wsrep::provider::status status = wsrep::provider::success); - void enter_toi_common(); + void enter_toi_common(wsrep::unique_lock&); void leave_toi_common(); wsrep::thread::id owning_thread_id_; diff --git a/src/client_state.cpp b/src/client_state.cpp index 7928ede..1b8fbf0 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -330,26 +330,44 @@ void wsrep::client_state::disable_streaming() // TOI // ////////////////////////////////////////////////////////////////////////////// -void wsrep::client_state::enter_toi_common() +void wsrep::client_state::enter_toi_common( + wsrep::unique_lock& lock) { - wsrep::unique_lock lock(mutex_); + assert(lock.owns_lock()); toi_mode_ = mode_; mode(lock, m_toi); } int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys, const wsrep::const_buffer& buffer, - int flags) + std::chrono::time_point wait_until) { debug_log_state("enter_toi_local: enter"); assert(state_ == s_exec); assert(mode_ == m_local); int ret; - switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags)) + + enum wsrep::provider::status status; + wsrep::unique_lock lock(mutex_); + // Poll until timeout expires or the client is interrupted. + do + { + lock.unlock(); + status = provider().enter_toi(id_, keys, buffer, toi_meta_, + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit); + lock.lock(); + } + while (status == wsrep::provider::error_certification_failed && + wait_until.time_since_epoch().count() && + wait_until < std::chrono::steady_clock::now() && + not client_service_.interrupted(lock)); + + switch (status) { case wsrep::provider::success: { - enter_toi_common(); + enter_toi_common(lock); ret = 0; break; } @@ -366,8 +384,9 @@ int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys, void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta) { debug_log_state("enter_toi_mode: enter"); + wsrep::unique_lock lock(mutex_); assert(mode_ == m_high_priority); - enter_toi_common(); + enter_toi_common(lock); toi_meta_ = ws_meta; debug_log_state("enter_toi_mode: leave"); } @@ -455,8 +474,10 @@ int wsrep::client_state::end_rsu() // NBO // /////////////////////////////////////////////////////////////////////////////// -int wsrep::client_state::begin_nbo_phase_one(const wsrep::key_array& keys, - const wsrep::const_buffer& buffer) +int wsrep::client_state::begin_nbo_phase_one( + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + std::chrono::time_point wait_until) { debug_log_state("begin_nbo_phase_one: enter"); wsrep::unique_lock lock(mutex_); @@ -464,16 +485,19 @@ int wsrep::client_state::begin_nbo_phase_one(const wsrep::key_array& keys, assert(mode_ == m_local); assert(toi_mode_ == m_undefined); - /** - * @todo Implement retrying if the call fails due to certification - * failure. - */ - lock.unlock(); - enum wsrep::provider::status status( - provider().enter_toi( - id_, keys, buffer, toi_meta_, - wsrep::provider::flag::start_transaction)); - lock.lock(); + enum wsrep::provider::status status; + // Poll until timeout expires or the client is interrupted. + do + { + lock.unlock(); + status = provider().enter_toi(id_, keys, buffer, toi_meta_, + wsrep::provider::flag::start_transaction); + lock.lock(); + } + while (status == wsrep::provider::error_certification_failed && + wait_until.time_since_epoch().count() && + wait_until < std::chrono::steady_clock::now() && + not client_service_.interrupted(lock)); int ret; switch (status) { diff --git a/test/toi_test.cpp b/test/toi_test.cpp index f629e40..e637017 100644 --- a/test/toi_test.cpp +++ b/test/toi_test.cpp @@ -33,9 +33,7 @@ BOOST_FIXTURE_TEST_CASE(test_toi_mode, key.append_key_part("k2", 2); wsrep::key_array keys{key}; wsrep::const_buffer buf("toi", 3); - BOOST_REQUIRE(cc.enter_toi_local(keys, buf, - wsrep::provider::flag::start_transaction | - wsrep::provider::flag::commit) == 0); + BOOST_REQUIRE(cc.enter_toi_local(keys, buf) == 0); BOOST_REQUIRE(cc.mode() == wsrep::client_state::m_toi); BOOST_REQUIRE(cc.in_toi()); BOOST_REQUIRE(cc.toi_mode() == wsrep::client_state::m_local);