1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-08-09 13:22:47 +03:00
Files
wsrep-lib/src/transaction_context.cpp
Teemu Ollakka 8e501fc03b Fixed several issues with BF aborts and error reporting.
Client context override_error() is now implemented in
client_context base class. On error virtual method will be called
to notify derived classes.

Started refactoring logging utility.
2018-05-02 13:55:38 +03:00

820 lines
23 KiB
C++

//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "transaction_context.hpp"
#include "client_context.hpp"
#include "server_context.hpp"
#include "key.hpp"
#include "data.hpp"
#include "logger.hpp"
#include "compiler.hpp"
#include <sstream>
#include <memory>
// Public
trrep::transaction_context::transaction_context(
trrep::client_context& client_context)
: provider_(client_context.provider())
, client_context_(client_context)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, bf_abort_state_(s_executing)
, bf_abort_client_state_()
, ws_handle_()
, trx_meta_()
, flags_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
, rollback_replicated_for_(false)
{ }
trrep::transaction_context::~transaction_context()
{
}
int trrep::transaction_context::start_transaction(
const trrep::transaction_id& id)
{
assert(active() == false);
id_ = id;
state_ = s_executing;
state_hist_.clear();
ws_handle_.trx_id = id_.get();
flags_ |= WSREP_FLAG_TRX_START;
switch (client_context_.mode())
{
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
return 0;
case trrep::client_context::m_replicating:
return provider_.start_transaction(&ws_handle_);
default:
assert(0);
return 1;
}
}
int trrep::transaction_context::start_transaction(
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta,
uint32_t flags)
{
assert(active() == false);
assert(client_context_.mode() == trrep::client_context::m_applier);
state_ = s_executing;
ws_handle_ = ws_handle;
trx_meta_ = trx_meta;
flags_ = flags;
certified_ = true;
return 0;
}
int trrep::transaction_context::append_key(const trrep::key& key)
{
return provider_.append_key(&ws_handle_, &key.get());
}
int trrep::transaction_context::append_data(const trrep::data& data)
{
return provider_.append_data(&ws_handle_, &data.get());
}
int trrep::transaction_context::before_prepare()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("before_prepare_enter");
assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort)
{
assert(client_context_.mode() == trrep::client_context::m_replicating);
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
state(lock, s_preparing);
switch (client_context_.mode())
{
case trrep::client_context::m_replicating:
if (is_streaming())
{
client_context_.debug_suicide(
"crash_last_fragment_commit_before_fragment_removal");
lock.unlock();
if (client_context_.server_context().statement_allowed_for_streaming(
client_context_, *this))
{
client_context_.override_error(trrep::e_error_during_commit);
ret = 1;
}
else
{
remove_fragments();
}
lock.lock();
client_context_.debug_suicide(
"crash_last_fragment_commit_after_fragment_removal");
}
break;
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
break;
}
assert(state() == s_preparing);
debug_log_state("before_prepare_leave");
return ret;
}
int trrep::transaction_context::after_prepare()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("after_prepare_enter");
assert(state() == s_preparing || state() == s_must_abort);
if (state() == s_must_abort)
{
assert(client_context_.mode() == trrep::client_context::m_replicating);
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
switch (client_context_.mode())
{
case trrep::client_context::m_replicating:
if (state() == s_preparing)
{
ret = certify_commit(lock);
assert((ret == 0 || state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
}
else
{
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
break;
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
state(lock, s_certifying);
state(lock, s_committing);
ret = 0;
break;
}
debug_log_state("after_prepare_leave");
return ret;
}
int trrep::transaction_context::before_commit()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("before_commit_enter");
assert(state() == s_executing ||
state() == s_committing ||
state() == s_must_abort ||
state() == s_replaying);
switch (client_context_.mode())
{
case trrep::client_context::m_local:
if (ordered())
{
ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
}
break;
case trrep::client_context::m_replicating:
// Commit is one phase - before/after prepare was not called
if (state() == s_executing)
{
ret = certify_commit(lock);
assert((ret == 0 || state() == s_committing)
||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
}
else if (state() != s_committing)
{
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
else
{
// 2PC commit, prepare was done before
ret = 0;
}
if (ret == 0)
{
lock.unlock();
wsrep_status_t status(provider_.commit_order_enter(&ws_handle_, &trx_meta_));
lock.lock();
switch (status)
{
case WSREP_OK:
break;
case WSREP_BF_ABORT:
if (state() != s_must_abort)
{
state(lock, s_must_abort);
}
ret = 1;
break;
default:
ret = 1;
assert(0);
break;
}
}
break;
case trrep::client_context::m_applier:
assert(ordered());
ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
if (ret)
{
state(lock, s_must_abort);
}
else
{
if (state() == s_executing)
{
// 1pc
state(lock, s_certifying);
state(lock, s_committing);
}
else if (state() == s_replaying)
{
state(lock, s_committing);
}
}
break;
}
debug_log_state("before_commit_leave");
return ret;
}
int trrep::transaction_context::ordered_commit()
{
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("ordered_commit_enter");
assert(state() == s_committing);
assert(ordered());
ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_);
// Should always succeed
assert(ret == 0);
state(lock, s_ordered_commit);
debug_log_state("ordered_commit_leave");
return ret;
}
int trrep::transaction_context::after_commit()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("after_commit_enter");
assert(state() == s_ordered_commit);
switch (client_context_.mode())
{
case trrep::client_context::m_local:
// Nothing to do
break;
case trrep::client_context::m_replicating:
if (is_streaming())
{
clear_fragments();
}
ret = provider_.release(&ws_handle_);
break;
case trrep::client_context::m_applier:
break;
}
assert(ret == 0);
state(lock, s_committed);
debug_log_state("after_commit_leave");
return ret;
}
int trrep::transaction_context::before_rollback()
{
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("before_rollback_enter");
assert(state() == s_executing ||
state() == s_must_abort ||
state() == s_cert_failed ||
state() == s_must_replay);
switch (state())
{
case s_executing:
// Voluntary rollback
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
break;
case s_must_abort:
if (certified())
{
state(lock, s_must_replay);
}
else
{
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
}
break;
case s_cert_failed:
if (is_streaming())
{
// Replicate rollback fragment
provider_.rollback(id_.get());
}
state(lock, s_aborting);
break;
case s_must_replay:
break;
default:
assert(0);
break;
}
debug_log_state("before_rollback_leave");
return 0;
}
int trrep::transaction_context::after_rollback()
{
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("after_rollback_enter");
assert(state() == s_aborting ||
state() == s_must_replay);
if (state() == s_aborting)
{
state(lock, s_aborted);
}
// Releasing the transaction from provider is postponed into
// after_statement() hook. Depending on DBMS system all the
// resources acquired by transaction may or may not be released
// during actual rollback. If the transaction has been ordered,
// releasing the commit ordering critical section should be
// also postponed until all resources have been released.
debug_log_state("after_rollback_leave");
return 0;
}
int trrep::transaction_context::after_statement()
{
int ret(0);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
debug_log_state("after_statement_enter");
assert(state() == s_executing ||
state() == s_committed ||
state() == s_aborted ||
state() == s_must_abort ||
state() == s_cert_failed ||
state() == s_must_replay);
switch (state())
{
case s_executing:
// ?
break;
case s_committed:
if (is_streaming())
{
state(lock, s_executing);
}
break;
case s_must_abort:
case s_cert_failed:
client_context_.override_error(trrep::e_deadlock_error);
lock.unlock();
ret = client_context_.rollback(*this);
lock.lock();
break;
case s_aborted:
break;
case s_must_replay:
state(lock, s_replaying);
lock.unlock();
ret = client_context_.replay(*this);
lock.lock();
provider_.release(&ws_handle_);
break;
default:
assert(0);
break;
}
assert(state() == s_executing ||
state() == s_committed ||
state() == s_aborted);
if (state() == s_aborted)
{
if (ordered())
{
ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_);
if (ret == 0) provider_.commit_order_leave(&ws_handle_, &trx_meta_);
}
provider_.release(&ws_handle_);
}
if (state() != s_executing)
{
cleanup();
}
debug_log_state("after_statement_leave");
assert(ret == 0 || state() == s_aborted);
return ret;
}
bool trrep::transaction_context::bf_abort(
trrep::unique_lock<trrep::mutex>& lock TRREP_UNUSED,
const trrep::transaction_context& txc)
{
bool ret(false);
assert(lock.owns_lock());
assert(&lock.mutex() == &mutex());
if (active() == false)
{
trrep::log() << "Transaction not active, skipping bf abort";
}
else if (ordered() && seqno() < txc.seqno())
{
trrep::log() << "Not allowed to BF abort transaction ordered before "
<< "aborter: " << seqno() << " < " << txc.seqno();
}
else
{
switch (state())
{
case s_executing:
case s_preparing:
case s_certifying:
case s_committing:
{
wsrep_seqno_t victim_seqno(WSREP_SEQNO_UNDEFINED);
wsrep_status_t status(client_context_.provider().bf_abort(
txc.seqno(), id_.get(), &victim_seqno));
switch (status)
{
case WSREP_OK:
trrep::log() << "Seqno " << txc.seqno()
<< " succesfully BF aborted " << id_.get()
<< " victim_seqno " << victim_seqno;
bf_abort_state_ = state();
state(lock, s_must_abort);
ret = true;
break;
default:
trrep::log() << "Seqno " << txc.seqno()
<< " failed to BF abort " << id_.get()
<< " with status " << status
<< " victim_seqno " << victim_seqno;
break;
}
break;
}
default:
trrep::log() << "BF abort not allowed in state "
<< trrep::to_string(state());
break;
}
}
if (ret)
{
bf_abort_client_state_ = client_context_.state();
if (client_context_.server_context().rollback_mode() ==
trrep::server_context::rm_sync)
{
//! \todo Launch background rollbacker.
assert(0);
}
}
return ret;
}
trrep::mutex& trrep::transaction_context::mutex()
{
return client_context_.mutex();
}
// Private
void trrep::transaction_context::state(
trrep::unique_lock<trrep::mutex>& lock __attribute__((unused)),
enum trrep::transaction_context::state next_state)
{
assert(lock.owns_lock());
static const char allowed[n_states][n_states] =
{ /* ex pr ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */
{ 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0} /* re */
};
if (allowed[state_][next_state])
{
state_hist_.push_back(state_);
state_ = next_state;
}
else
{
std::ostringstream os;
os << "unallowed state transition for transaction "
<< id_.get() << ": " << trrep::to_string(state_)
<< " -> " << trrep::to_string(next_state);
trrep::log() << os.str();
throw trrep::runtime_error(os.str());
}
}
int trrep::transaction_context::certify_fragment(
trrep::unique_lock<trrep::mutex>& lock)
{
// This method is not fully implemented and tested yet.
throw trrep::not_implemented_error();
assert(lock.owns_lock());
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(rollback_replicated_for_ != id_);
client_context_.wait_for_replayers(lock);
if (state() == s_must_abort)
{
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
state(lock, s_certifying);
lock.unlock();
uint32_t flags(0);
if (fragments_.empty())
{
flags |= WSREP_FLAG_TRX_START;
}
trrep::data data;
if (client_context_.prepare_data_for_replication(*this, data))
{
lock.lock();
state(lock, s_must_abort);
return 1;
}
// Client context to store fragment in separate transaction
// Switch temporarily to sr_transaction_context, switch back
// to original when this goes out of scope
std::auto_ptr<trrep::client_context> sr_client_context(
client_context_.server_context().local_client_context());
trrep::client_context_switch client_context_switch(
client_context_,
*sr_client_context);
trrep::transaction_context sr_transaction_context(*sr_client_context);
if (sr_client_context->append_fragment(sr_transaction_context, flags, data))
{
lock.lock();
state(lock, s_must_abort);
client_context_.override_error(trrep::e_append_fragment_error);
return 1;
}
wsrep_status_t cert_ret(provider_.certify(client_context_.id().get(),
&sr_transaction_context.ws_handle_,
flags,
&sr_transaction_context.trx_meta_));
int ret(0);
switch (cert_ret)
{
case WSREP_OK:
sr_client_context->commit(sr_transaction_context);
break;
default:
sr_client_context->rollback(sr_transaction_context);
ret = 1;
break;
}
return ret;
}
int trrep::transaction_context::certify_commit(
trrep::unique_lock<trrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(id_ != trrep::transaction_id::invalid());
client_context_.wait_for_replayers(lock);
assert(lock.owns_lock());
if (state() == s_must_abort)
{
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
state(lock, s_certifying);
flags(flags() | WSREP_FLAG_TRX_END);
lock.unlock();
trrep::data data;
if (client_context_.prepare_data_for_replication(*this, data))
{
// Note: Error must be set by prepare_data_for_replication()
lock.lock();
state(lock, s_must_abort);
return 1;
}
if (client_context_.killed())
{
lock.lock();
client_context_.override_error(trrep::e_deadlock_error);
state(lock, s_must_abort);
return 1;
}
wsrep_status cert_ret(provider_.certify(client_context_.id().get(),
&ws_handle_,
flags(),
&trx_meta_));
lock.lock();
assert(state() == s_certifying || state() == s_must_abort);
client_context_.debug_sync("wsrep_after_replication");
int ret(1);
switch (cert_ret)
{
case WSREP_OK:
assert(ordered());
switch (state())
{
case s_certifying:
certified_ = true;
state(lock, s_committing);
ret = 0;
break;
case s_must_abort:
// We got BF aborted after succesful certification
// and before acquiring client context lock. This means that
// the trasaction must be replayed.
client_context_.will_replay(*this);
state(lock, s_must_replay);
break;
default:
assert(0);
break;
}
certified_ = true;
break;
case WSREP_WARNING:
assert(ordered() == false);
state(lock, s_must_abort);
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_TRX_MISSING:
state(lock, s_must_abort);
// The execution should never reach this point if the
// transaction has not generated any keys or data.
client_context_.override_error(trrep::e_error_during_commit);
assert(0);
break;
case WSREP_BF_ABORT:
// Transaction was replicated succesfully and it was either
// certified succesfully or the result of certifying is not
// yet known. Therefore the transaction must roll back
// and go through replay either to replay and commit the whole
// transaction or to determine failed certification status.
assert(ordered());
client_context_.will_replay(*this);
if (state() != s_must_abort)
{
state(lock, s_must_abort);
}
state(lock, s_must_replay);
break;
case WSREP_TRX_FAIL:
state(lock, s_cert_failed);
client_context_.override_error(trrep::e_deadlock_error);
break;
case WSREP_SIZE_EXCEEDED:
state(lock, s_must_abort);
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_CONN_FAIL:
case WSREP_NODE_FAIL:
// Galera provider may return CONN_FAIL if the trx is
// BF aborted O_o
if (state() != s_must_abort)
{
state(lock, s_must_abort);
}
client_context_.override_error(trrep::e_error_during_commit);
break;
case WSREP_FATAL:
client_context_.abort();
break;
case WSREP_NOT_IMPLEMENTED:
case WSREP_NOT_ALLOWED:
client_context_.override_error(trrep::e_error_during_commit);
state(lock, s_must_abort);
assert(0);
break;
default:
client_context_.override_error(trrep::e_error_during_commit);
break;
}
return ret;
}
void trrep::transaction_context::remove_fragments()
{
throw trrep::not_implemented_error();
}
void trrep::transaction_context::clear_fragments()
{
throw trrep::not_implemented_error();
}
void trrep::transaction_context::cleanup()
{
debug_log_state("cleanup_enter");
id_ = trrep::transaction_id::invalid();
ws_handle_.trx_id = -1;
if (is_streaming())
{
state_ = s_executing;
}
// Keep the state history for troubleshooting. Reset at start_transaction().
// state_hist_.clear();
trx_meta_.gtid = WSREP_GTID_UNDEFINED;
trx_meta_.stid.node = WSREP_UUID_UNDEFINED;
trx_meta_.stid.trx = trrep::transaction_id::invalid();
trx_meta_.stid.conn = trrep::client_id::invalid();
certified_ = false;
pa_unsafe_ = false;
debug_log_state("cleanup_leave");
}
void trrep::transaction_context::debug_log_state(
const std::string& context TRREP_UNUSED) const
{
if (client_context_.debug_log_level() >= 1)
{
trrep::log_debug() << context
<< ": server: " << client_context_.server_context().name()
<< ": client: " << client_context_.id().get()
<< " trx: " << int64_t(id_.get())
<< " state: " << trrep::to_string(state_)
<< " error: "
<< trrep::to_string(client_context_.current_error());
}
}