1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-05 03:41:13 +03:00
Files
wsrep-lib/src/client_state.cpp
Teemu Ollakka 9f153be277 Fixes to streaming rollback processing
* Count separately fragments certified and fragments stored in
  streaming context. Storing the fragment may ultimately fail
  due to BF abort even if the fragment was succesfully certified.
  Therefore we need to have separate counter for certified fragments
  to determine if the transaction is streaming and seqnos of fragments
  which have been succesfully stored.
* Provider release is called only after succesful fragment certification
  and fragment store.
* Fixed handling of write sets with rollback flag set in apply_write_set()
2018-07-16 10:07:46 +03:00

500 lines
15 KiB
C++

//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "wsrep/client_state.hpp"
#include "wsrep/compiler.hpp"
#include "wsrep/logger.hpp"
#include <sstream>
#include <iostream>
wsrep::provider& wsrep::client_state::provider() const
{
return server_state_.provider();
}
void wsrep::client_state::open(wsrep::client_id id)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(state_ == s_none);
debug_log_state("open: enter");
owning_thread_id_ = wsrep::this_thread::get_id();
current_thread_id_ = owning_thread_id_;
state(lock, s_idle);
id_ = id;
debug_log_state("open: leave");
}
void wsrep::client_state::close()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("close: enter");
state(lock, s_quitting);
lock.unlock();
if (transaction_.active())
{
client_service_.bf_rollback();
transaction_.after_statement();
}
debug_log_state("close: leave");
}
void wsrep::client_state::cleanup()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("cleanup: enter");
state(lock, s_none);
debug_log_state("cleanup: leave");
}
void wsrep::client_state::override_error(enum wsrep::client_error error,
enum wsrep::provider::status status)
{
assert(wsrep::this_thread::get_id() == owning_thread_id_);
if (current_error_ != wsrep::e_success &&
error == wsrep::e_success)
{
throw wsrep::runtime_error("Overriding error with success");
}
current_error_ = error;
current_error_status_ = status;
}
int wsrep::client_state::before_command()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("before_command: enter");
assert(state_ == s_idle);
if (transaction_.active() &&
server_state_.rollback_mode() == wsrep::server_state::rm_sync)
{
while (transaction_.state() == wsrep::transaction::s_aborting)
{
cond_.wait(lock);
}
}
state(lock, s_exec);
assert(transaction_.active() == false ||
(transaction_.state() == wsrep::transaction::s_executing ||
transaction_.state() == wsrep::transaction::s_aborted ||
(transaction_.state() == wsrep::transaction::s_must_abort &&
server_state_.rollback_mode() == wsrep::server_state::rm_async)));
if (transaction_.active())
{
if (transaction_.state() == wsrep::transaction::s_must_abort)
{
assert(server_state_.rollback_mode() ==
wsrep::server_state::rm_async);
override_error(wsrep::e_deadlock_error);
lock.unlock();
client_service_.bf_rollback();
(void)transaction_.after_statement();
lock.lock();
assert(transaction_.state() ==
wsrep::transaction::s_aborted);
assert(transaction_.active() == false);
assert(current_error() != wsrep::e_success);
debug_log_state("before_command: error");
return 1;
}
else if (transaction_.state() == wsrep::transaction::s_aborted)
{
// Transaction was rolled back either just before sending result
// to the client, or after client_state become idle.
// Clean up the transaction and return error.
override_error(wsrep::e_deadlock_error);
lock.unlock();
(void)transaction_.after_statement();
lock.lock();
assert(transaction_.active() == false);
debug_log_state("before_command: error");
return 1;
}
}
debug_log_state("before_command: success");
return 0;
}
void wsrep::client_state::after_command_before_result()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("after_command_before_result: enter");
assert(state() == s_exec);
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
override_error(wsrep::e_deadlock_error);
lock.unlock();
client_service_.bf_rollback();
(void)transaction_.after_statement();
lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted);
assert(current_error() != wsrep::e_success);
}
state(lock, s_result);
debug_log_state("after_command_before_result: leave");
}
void wsrep::client_state::after_command_after_result()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("after_command_after_result_enter");
assert(state() == s_result);
assert(transaction_.state() != wsrep::transaction::s_aborting);
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
lock.unlock();
client_service_.bf_rollback();
lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted);
override_error(wsrep::e_deadlock_error);
}
else if (transaction_.active() == false)
{
current_error_ = wsrep::e_success;
}
sync_wait_gtid_ = wsrep::gtid::undefined();
state(lock, s_idle);
debug_log_state("after_command_after_result: leave");
}
int wsrep::client_state::before_statement()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("before_statement: enter");
#if 0
/**
* @todo It might be beneficial to implement timed wait for
* server synced state.
*/
if (allow_dirty_reads_ == false &&
server_state_.state() != wsrep::server_state::s_synced)
{
return 1;
}
#endif // 0
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
// Rollback and cleanup will happen in after_command_before_result()
debug_log_state("before_statement_error");
return 1;
}
debug_log_state("before_statement: success");
return 0;
}
int wsrep::client_state::after_statement()
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("after_statement: enter");
assert(state() == s_exec);
assert(mode() == m_local);
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
lock.unlock();
client_service_.bf_rollback();
lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted);
override_error(wsrep::e_deadlock_error);
}
lock.unlock();
(void)transaction_.after_statement();
if (current_error() == wsrep::e_deadlock_error)
{
if (mode_ == m_local)
{
debug_log_state("after_statement: may_retry");
return 1;
}
else
{
debug_log_state("after_statement: error");
return 1;
}
}
debug_log_state("after_statement: success");
return 0;
}
//////////////////////////////////////////////////////////////////////////////
// Streaming //
//////////////////////////////////////////////////////////////////////////////
int wsrep::client_state::enable_streaming(
enum wsrep::streaming_context::fragment_unit
fragment_unit,
size_t fragment_size)
{
assert(mode_ == m_local);
if (transaction_.is_streaming() &&
transaction_.streaming_context().fragment_unit() !=
fragment_unit)
{
wsrep::log_error()
<< "Changing fragment unit for active streaming transaction "
<< "not allowed";
return 1;
}
transaction_.streaming_context().enable(fragment_unit, fragment_size);
return 0;
}
void wsrep::client_state::disable_streaming()
{
assert(state_ == s_exec && mode_ == m_local);
transaction_.streaming_context().disable();
}
//////////////////////////////////////////////////////////////////////////////
// TOI //
//////////////////////////////////////////////////////////////////////////////
int wsrep::client_state::enter_toi(const wsrep::key_array& keys,
const wsrep::const_buffer& buffer,
int flags)
{
assert(state_ == s_exec);
assert(mode_ == m_local);
int ret;
switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags))
{
case wsrep::provider::success:
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
toi_mode_ = mode_;
mode(lock, m_toi);
ret = 0;
break;
}
default:
override_error(wsrep::e_error_during_commit,
wsrep::provider::error_certification_failed);
ret = 1;
break;
}
return ret;
}
int wsrep::client_state::enter_toi(const wsrep::ws_meta& ws_meta)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_high_priority);
toi_mode_ = mode_;
mode(lock, m_toi);
toi_meta_ = ws_meta;
return 0;
}
int wsrep::client_state::leave_toi()
{
int ret;
if (toi_mode_ == m_local)
{
switch (provider().leave_toi(id_))
{
case wsrep::provider::success:
ret = 0;
break;
default:
assert(0);
override_error(wsrep::e_error_during_commit,
wsrep::provider::error_unknown);
ret = 1;
break;
}
}
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
mode(lock, toi_mode_);
toi_mode_ = m_local;
if (toi_meta_.gtid().is_undefined() == false)
{
update_last_written_gtid(toi_meta_.gtid());
}
toi_meta_ = wsrep::ws_meta();
return ret;
}
///////////////////////////////////////////////////////////////////////////////
// RSU //
///////////////////////////////////////////////////////////////////////////////
int wsrep::client_state::begin_rsu(int timeout)
{
if (server_state_.desync())
{
wsrep::log_warning() << "Failed to desync server";
return 1;
}
if (server_state_.server_service().wait_committing_transactions(timeout))
{
wsrep::log_warning() << "RSU failed due to pending transactions";
server_state_.resync();
return 1;
}
wsrep::seqno pause_seqno(server_state_.pause());
if (pause_seqno.is_undefined())
{
wsrep::log_warning() << "Failed to pause provider";
server_state_.resync();
return 1;
}
wsrep::log_info() << "Provider paused at: " << pause_seqno;
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
toi_mode_ = mode_;
mode(lock, m_rsu);
return 0;
}
int wsrep::client_state::end_rsu()
{
int ret(0);
try
{
server_state_.resume();
server_state_.resync();
}
catch (const wsrep::runtime_error& e)
{
wsrep::log_warning() << "End RSU failed: " << e.what();
ret = 1;
}
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
mode(lock, toi_mode_);
return ret;
}
///////////////////////////////////////////////////////////////////////////////
// Misc //
///////////////////////////////////////////////////////////////////////////////
int wsrep::client_state::sync_wait(int timeout)
{
std::pair<wsrep::gtid, enum wsrep::provider::status> result(
server_state_.causal_read(timeout));
int ret(1);
switch (result.second)
{
case wsrep::provider::success:
sync_wait_gtid_ = result.first;
ret = 0;
break;
case wsrep::provider::error_not_implemented:
override_error(wsrep::e_not_supported_error);
break;
default:
override_error(wsrep::e_timeout_error);
break;
}
return ret;
}
///////////////////////////////////////////////////////////////////////////////
// Private //
///////////////////////////////////////////////////////////////////////////////
void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
{
assert(last_written_gtid_.is_undefined() ||
(last_written_gtid_.id() == gtid.id() &&
last_written_gtid_.seqno() < gtid.seqno()));
last_written_gtid_ = gtid;
}
void wsrep::client_state::debug_log_state(const char* context) const
{
if (debug_log_level() >= 1)
{
wsrep::log_debug() << context
<< "(" << id_.get()
<< "," << to_c_string(state_)
<< "," << to_c_string(mode_)
<< "," << wsrep::to_string(current_error_)
<< ")";
}
}
void wsrep::client_state::state(
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
enum wsrep::client_state::state state)
{
// For locally processing client states (local, toi, rsu)
// changing the state is allowed only from the owning thread.
// In high priority mode the processing thread however may
// change and we check only that store_globals() has been
// called by the current thread to gain ownership.
//
// Note that this check assumes that there is always a single
// thread per local client connection. This may not always
// hold and the sanity check mechanism may need to be revised.
assert((mode_ != m_high_priority &&
wsrep::this_thread::get_id() == owning_thread_id_) ||
(mode_ == m_high_priority &&
wsrep::this_thread::get_id() == current_thread_id_));
assert(lock.owns_lock());
static const char allowed[state_max_][state_max_] =
{
/* none idle exec result quit */
{ 0, 1, 0, 0, 0}, /* none */
{ 0, 0, 1, 0, 1}, /* idle */
{ 0, 0, 0, 1, 0}, /* exec */
{ 0, 1, 0, 0, 0}, /* result */
{ 1, 0, 0, 0, 0} /* quit */
};
if (allowed[state_][state])
{
state_hist_.push_back(state_);
state_ = state;
if (state_hist_.size() > 10)
{
state_hist_.erase(state_hist_.begin());
}
}
else
{
std::ostringstream os;
os << "client_state: Unallowed state transition: "
<< state_ << " -> " << state;
throw wsrep::runtime_error(os.str());
}
}
void wsrep::client_state::mode(
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
enum mode mode)
{
assert(lock.owns_lock());
static const char allowed[n_modes_][n_modes_] =
{ /* l h t r */
{ 0, 1, 1, 1 }, /* local */
{ 1, 0, 1, 0 }, /* high prio */
{ 1, 1, 0, 0 }, /* toi */
{ 1, 0, 0, 0 } /* rsu */
};
if (allowed[mode_][mode])
{
mode_ = mode;
}
else
{
std::ostringstream os;
os << "client_state: Unallowed mode transition: "
<< mode_ << " -> " << mode;
throw wsrep::runtime_error(os.str());
}
}