mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-06-14 15:02:27 +03:00
codership/wsrep-lib#23 before_command() wait for ongoing rollbacks leaks
Storing information that background rollbacker in ongoing in client state has_rollback_ This can be used for detecting if there is ongoing background rollback, and client should keep waiting in before_command() entry to avoid conflicts in accessing client state during background rollbacking. transaction::bf_abort() is modified to set has_rollback_ flag when backgroung rollbacking has been assigned for the client sync_rollback_complete() method has been modified to reset the backround rollbacker flag
This commit is contained in:
@ -428,12 +428,14 @@ namespace wsrep
|
|||||||
/**
|
/**
|
||||||
* This method should be called by the background rollbacker
|
* This method should be called by the background rollbacker
|
||||||
* thread after the rollback is complete. This will allow
|
* thread after the rollback is complete. This will allow
|
||||||
* the client to proceed with command execution.
|
* the client to proceed through before_command().
|
||||||
*/
|
*/
|
||||||
void sync_rollback_complete()
|
void sync_rollback_complete()
|
||||||
{
|
{
|
||||||
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||||
assert(state_ == s_idle && mode_ == m_local &&
|
assert(state_ == s_idle && mode_ == m_local &&
|
||||||
transaction_.state() == wsrep::transaction::s_aborted);
|
transaction_.state() == wsrep::transaction::s_aborted);
|
||||||
|
set_rollbacker(false);
|
||||||
cond_.notify_all();
|
cond_.notify_all();
|
||||||
}
|
}
|
||||||
/** @} */
|
/** @} */
|
||||||
@ -707,6 +709,7 @@ namespace wsrep
|
|||||||
enum mode mode)
|
enum mode mode)
|
||||||
: owning_thread_id_(wsrep::this_thread::get_id())
|
: owning_thread_id_(wsrep::this_thread::get_id())
|
||||||
, current_thread_id_(owning_thread_id_)
|
, current_thread_id_(owning_thread_id_)
|
||||||
|
, has_rollbacker_(false)
|
||||||
, mutex_(mutex)
|
, mutex_(mutex)
|
||||||
, cond_(cond)
|
, cond_(cond)
|
||||||
, server_state_(server_state)
|
, server_state_(server_state)
|
||||||
@ -749,6 +752,7 @@ namespace wsrep
|
|||||||
|
|
||||||
wsrep::thread::id owning_thread_id_;
|
wsrep::thread::id owning_thread_id_;
|
||||||
wsrep::thread::id current_thread_id_;
|
wsrep::thread::id current_thread_id_;
|
||||||
|
bool has_rollbacker_;
|
||||||
wsrep::mutex& mutex_;
|
wsrep::mutex& mutex_;
|
||||||
wsrep::condition_variable& cond_;
|
wsrep::condition_variable& cond_;
|
||||||
wsrep::server_state& server_state_;
|
wsrep::server_state& server_state_;
|
||||||
@ -766,6 +770,21 @@ namespace wsrep
|
|||||||
int debug_log_level_;
|
int debug_log_level_;
|
||||||
enum wsrep::client_error current_error_;
|
enum wsrep::client_error current_error_;
|
||||||
enum wsrep::provider::status current_error_status_;
|
enum wsrep::provider::status current_error_status_;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assigns external rollbacker thread for the client
|
||||||
|
* this will block client in before_command(), until
|
||||||
|
* rolbacker has released the client
|
||||||
|
*/
|
||||||
|
void set_rollbacker(bool value)
|
||||||
|
{
|
||||||
|
has_rollbacker_ = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool has_rollbacker()
|
||||||
|
{
|
||||||
|
return(has_rollbacker_);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline const char* to_c_string(
|
static inline const char* to_c_string(
|
||||||
|
@ -37,6 +37,7 @@ void wsrep::client_state::open(wsrep::client_id id)
|
|||||||
debug_log_state("open: enter");
|
debug_log_state("open: enter");
|
||||||
owning_thread_id_ = wsrep::this_thread::get_id();
|
owning_thread_id_ = wsrep::this_thread::get_id();
|
||||||
current_thread_id_ = owning_thread_id_;
|
current_thread_id_ = owning_thread_id_;
|
||||||
|
has_rollbacker_ = false;
|
||||||
state(lock, s_idle);
|
state(lock, s_idle);
|
||||||
id_ = id;
|
id_ = id;
|
||||||
debug_log_state("open: leave");
|
debug_log_state("open: leave");
|
||||||
@ -86,7 +87,10 @@ int wsrep::client_state::before_command()
|
|||||||
if (transaction_.active() &&
|
if (transaction_.active() &&
|
||||||
server_state_.rollback_mode() == wsrep::server_state::rm_sync)
|
server_state_.rollback_mode() == wsrep::server_state::rm_sync)
|
||||||
{
|
{
|
||||||
while (transaction_.state() == wsrep::transaction::s_aborting)
|
/*
|
||||||
|
* has_rollbacker() returns false, when background rollback is over
|
||||||
|
*/
|
||||||
|
while (has_rollbacker())
|
||||||
{
|
{
|
||||||
cond_.wait(lock);
|
cond_.wait(lock);
|
||||||
}
|
}
|
||||||
|
@ -918,6 +918,8 @@ bool wsrep::transaction::bf_abort(
|
|||||||
// between releasing the lock and before background
|
// between releasing the lock and before background
|
||||||
// rollbacker gets control.
|
// rollbacker gets control.
|
||||||
state(lock, wsrep::transaction::s_aborting);
|
state(lock, wsrep::transaction::s_aborting);
|
||||||
|
client_state_.set_rollbacker(true);
|
||||||
|
|
||||||
if (client_state_.mode() == wsrep::client_state::m_high_priority)
|
if (client_state_.mode() == wsrep::client_state::m_high_priority)
|
||||||
{
|
{
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
@ -43,5 +43,4 @@ namespace wsrep_test
|
|||||||
void bf_abort_provider(wsrep::mock_server_state& sc,
|
void bf_abort_provider(wsrep::mock_server_state& sc,
|
||||||
const wsrep::transaction& tc,
|
const wsrep::transaction& tc,
|
||||||
wsrep::seqno bf_seqno);
|
wsrep::seqno bf_seqno);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -899,6 +899,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
|
|||||||
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
||||||
BOOST_REQUIRE(tc.active() == true);
|
BOOST_REQUIRE(tc.active() == true);
|
||||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
|
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
|
||||||
|
cc.sync_rollback_complete();
|
||||||
BOOST_REQUIRE(cc.before_command() == 1);
|
BOOST_REQUIRE(cc.before_command() == 1);
|
||||||
BOOST_REQUIRE(tc.active() == false);
|
BOOST_REQUIRE(tc.active() == false);
|
||||||
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
||||||
@ -923,6 +924,7 @@ BOOST_FIXTURE_TEST_CASE(
|
|||||||
wsrep_test::bf_abort_unordered(cc);
|
wsrep_test::bf_abort_unordered(cc);
|
||||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
|
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
|
||||||
BOOST_REQUIRE(tc.active());
|
BOOST_REQUIRE(tc.active());
|
||||||
|
cc.sync_rollback_complete();
|
||||||
BOOST_REQUIRE(cc.before_command() == 1);
|
BOOST_REQUIRE(cc.before_command() == 1);
|
||||||
BOOST_REQUIRE(tc.active() == false);
|
BOOST_REQUIRE(tc.active() == false);
|
||||||
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
||||||
|
Reference in New Issue
Block a user