1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-27 09:01:50 +03:00

Implemented SR transaction rollback.

This commit is contained in:
Teemu Ollakka
2018-07-10 14:01:41 +03:00
parent e916453e6d
commit 80ca03daaf
17 changed files with 362 additions and 91 deletions

View File

@ -20,6 +20,11 @@ int db::high_priority_service::start_transaction(
return client_.client_state().start_transaction(ws_handle, ws_meta);
}
const wsrep::transaction& db::high_priority_service::transaction() const
{
return client_.client_state().transaction();
}
void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
{
throw wsrep::not_implemented_error();

View File

@ -17,6 +17,7 @@ namespace db
high_priority_service(db::server& server, db::client& client);
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override;
const wsrep::transaction& transaction() const override;
void adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&) override;

View File

@ -434,11 +434,17 @@ namespace wsrep
return transaction_.start_replaying(ws_meta);
}
/**
* Adopt a streaming transaction state. This is must be
* called from high_priority_service::adopt_transaction()
* during streaming transaction rollback. The call will
* set up enough context for handling the rollback
* fragment.
*/
void adopt_transaction(const wsrep::transaction& transaction)
{
assert(mode_ == m_high_priority);
transaction_.start_transaction(transaction.id());
transaction_.streaming_context() = transaction.streaming_context();
transaction_.adopt(transaction);
}
/** @name Non-transactional operations */

View File

@ -36,6 +36,12 @@ namespace wsrep
virtual int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
/**
* Return transaction object associated to high priority
* service state.
*/
virtual const wsrep::transaction& transaction() const = 0;
/**
* Adopt a transaction.
*/
@ -83,8 +89,18 @@ namespace wsrep
/**
* Commit a transaction.
* An implementation must call
* wsrep::client_state::prepare_for_ordering() to set
* the ws_handle and ws_meta before the commit if the
* commit process will go through client state commit
* processing. Otherwise the implementation must release
* commit order explicitly via provider.
*
* @param ws_handle Write set handle
* @param ws_meta Write set meta
*/
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
virtual int commit(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) = 0;
/**
* Roll back a transaction
*/

View File

@ -8,6 +8,8 @@
#include "exception.hpp"
#include "buffer.hpp"
#include <iosfwd>
namespace wsrep
{
class key
@ -60,6 +62,7 @@ namespace wsrep
typedef std::vector<wsrep::key> key_array;
std::ostream& operator<<(std::ostream&, const wsrep::key&);
}
#endif // WSREP_KEY_HPP

View File

@ -262,7 +262,7 @@ namespace wsrep
virtual enum status bf_abort(wsrep::seqno bf_seqno,
wsrep::transaction_id victim_trx,
wsrep::seqno& victim_seqno) = 0;
virtual int rollback(wsrep::transaction_id) = 0;
virtual enum status rollback(wsrep::transaction_id) = 0;
virtual enum status commit_order_enter(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual int commit_order_leave(const wsrep::ws_handle&,

View File

@ -88,6 +88,7 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta);
void adopt(const transaction& transaction);
void fragment_applied(wsrep::seqno seqno);
int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
@ -155,6 +156,7 @@ namespace wsrep
void clear_fragments();
void cleanup();
void debug_log_state(const char*) const;
void debug_log_key_append(const wsrep::key& key);
wsrep::server_service& server_service_;
wsrep::client_service& client_service_;

View File

@ -7,6 +7,7 @@ add_library(wsrep-lib
exception.cpp
gtid.cpp
id.cpp
key.cpp
logger.cpp
provider.cpp
seqno.cpp

View File

@ -36,6 +36,7 @@ void wsrep::client_state::close()
if (transaction_.active())
{
client_service_.bf_rollback();
transaction_.after_statement();
}
debug_log_state("close: leave");
}
@ -421,7 +422,19 @@ void wsrep::client_state::state(
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
enum wsrep::client_state::state state)
{
assert(wsrep::this_thread::get_id() == owning_thread_id_);
// For locally processing client states (local, toi, rsu)
// changing the state is allowed only from the owning thread.
// In high priority mode the processing thread however may
// change and we check only that store_globals() has been
// called by the current thread to gain ownership.
//
// Note that this check assumes that there is always a single
// thread per local client connection. This may not always
// hold and the sanity check mechanism may need to be revised.
assert((mode_ != m_high_priority &&
wsrep::this_thread::get_id() == owning_thread_id_) ||
(mode_ == m_high_priority &&
wsrep::this_thread::get_id() == current_thread_id_));
assert(lock.owns_lock());
static const char allowed[state_max_][state_max_] =
{

36
src/key.cpp Normal file
View File

@ -0,0 +1,36 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "wsrep/key.hpp"
#include <ostream>
#include <iomanip>
namespace
{
void print_key_part(std::ostream& os, const void* ptr, size_t len)
{
std::ios::fmtflags flags_save(os.flags());
os << len << ": ";
for (size_t i(0); i < len; ++i)
{
os << std::hex
<< std::setfill('0')
<< std::setw(2)
<< static_cast<int>(
*(reinterpret_cast<const unsigned char*>(ptr) + i)) << " ";
}
os.flags(flags_save);
}
}
std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::key& key)
{
os << "type: " << key.type();
for (size_t i(0); i < key.size(); ++i)
{
os << "\n ";
print_key_part(os, key.key_parts()[i].data(), key.key_parts()[i].size());
}
return os;
}

View File

@ -28,6 +28,79 @@ namespace
return (bootstrap || cluster_address == "gcomm://");
}
int apply_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service& streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
{
wsrep::high_priority_switch sw(high_priority_service,
streaming_applier);
ret = streaming_applier.apply_write_set(ws_meta, data);
streaming_applier.after_apply();
}
ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
high_priority_service.after_apply();
return ret;
}
int commit_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
// Make high priority switch to go out of scope
// before the streaming applier is released.
{
wsrep::high_priority_switch sw(
high_priority_service, *streaming_applier);
streaming_applier->remove_fragments(ws_meta);
ret = streaming_applier->commit(ws_handle, ws_meta);
streaming_applier->after_apply();
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
return ret;
}
int rollback_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret= 0;
// Adopts transaction state and starts a transaction for
// high priority service
high_priority_service.adopt_transaction(
streaming_applier->transaction());
{
wsrep::high_priority_switch ws(
high_priority_service, *streaming_applier);
streaming_applier->rollback();
streaming_applier->after_apply();
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(ws_handle, ws_meta);
return ret;
}
int apply_write_set(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle,
@ -36,6 +109,14 @@ namespace
{
int ret(0);
if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()) &&
wsrep::rolls_back_transaction(ws_meta.flags()))
{
// Non streaming rollback (certification failed)
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
}
else if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
if (high_priority_service.start_transaction(ws_handle, ws_meta))
@ -66,13 +147,12 @@ namespace
server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sa);
sa->start_transaction(ws_handle, ws_meta);
{
wsrep::high_priority_switch sw(high_priority_service, *sa);
sa->apply_write_set(ws_meta, data);
sa->after_apply();
}
high_priority_service.append_fragment_and_commit(ws_handle, ws_meta, data);
high_priority_service.after_apply();
ret = apply_fragment(server_state,
high_priority_service,
*sa,
ws_handle,
ws_meta,
data);
}
else if (ws_meta.flags() == 0)
{
@ -92,17 +172,13 @@ namespace
}
else
{
wsrep::high_priority_switch sw(high_priority_service, *sa);
ret = sa->apply_write_set(ws_meta, data);
sa->after_apply();
ret = apply_fragment(server_state,
high_priority_service,
*sa,
ws_handle,
ws_meta,
data);
}
ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
if (ret)
{
high_priority_service.rollback();
}
high_priority_service.after_apply();
}
else if (wsrep::commits_transaction(ws_meta.flags()))
{
@ -130,24 +206,48 @@ namespace
}
else
{
// Make high priority switch to go out of scope
// before the streaming applier is released.
{
wsrep::high_priority_switch sw(
high_priority_service, *sa);
sa->remove_fragments(ws_meta);
ret = sa->commit(ws_handle, ws_meta);
sa->after_apply();
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(sa);
// Commit fragment consumes sa
ret = commit_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
}
}
}
else if (wsrep::rolls_back_transaction(ws_meta.flags()))
{
wsrep::high_priority_service* sa(
server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sa == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning()
<< "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
}
else
{
// Rollback fragment consumes sa
ret = rollback_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
}
}
else
{
// SR fragment applying not implemented yet
assert(0);
}
return ret;
@ -633,13 +733,6 @@ int wsrep::server_state::on_apply(
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
if (rolls_back_transaction(ws_meta.flags()))
{
provider().commit_order_enter(ws_handle, ws_meta);
// todo: server_service_.log_dummy_write_set();
provider().commit_order_leave(ws_handle, ws_meta);
return 0;
}
if (is_toi(ws_meta.flags()))
{
return apply_toi(provider(), high_priority_service,

View File

@ -111,11 +111,12 @@ int wsrep::transaction::start_transaction(
{
debug_log_state("start_transaction enter");
assert(active() == false);
assert(flags() == 0);
id_ = id;
state_ = s_executing;
state_hist_.clear();
ws_handle_ = wsrep::ws_handle(id);
flags_ |= wsrep::provider::flag::start_transaction;
flags(wsrep::provider::flag::start_transaction);
switch (client_state_.mode())
{
case wsrep::client_state::m_high_priority:
@ -140,12 +141,14 @@ int wsrep::transaction::start_transaction(
{
// assert(ws_meta.flags());
assert(active() == false);
assert(flags() == 0);
id_ = ws_meta.transaction_id();
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
state_ = s_executing;
state_hist_.clear();
ws_handle_ = ws_handle;
ws_meta_ = ws_meta;
flags(wsrep::provider::flag::start_transaction);
certified_ = true;
}
else
@ -156,6 +159,16 @@ int wsrep::transaction::start_transaction(
return 0;
}
void wsrep::transaction::adopt(const wsrep::transaction& transaction)
{
debug_log_state("adopt enter");
assert(transaction.is_streaming());
start_transaction(transaction.id());
flags_ = transaction.flags();
streaming_context_ = transaction.streaming_context();
debug_log_state("adopt leave");
}
void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
{
assert(active());
@ -195,6 +208,7 @@ int wsrep::transaction::append_key(const wsrep::key& key)
/** @todo Collect table level keys for SR commit */
try
{
debug_log_key_append(key);
sr_keys_.insert(key);
return provider().append_key(ws_handle_, key);
}
@ -493,49 +507,62 @@ int wsrep::transaction::before_rollback()
debug_log_state("before_rollback_enter");
assert(state() == s_executing ||
state() == s_must_abort ||
state() == s_aborting || // Background rollbacker
// Background rollbacker or rollback initiated from SE
state() == s_aborting ||
state() == s_cert_failed ||
state() == s_must_replay);
switch (state())
switch (client_state_.mode())
{
case s_executing:
// Voluntary rollback
if (is_streaming())
{
streaming_rollback();
}
state(lock, s_aborting);
break;
case s_must_abort:
if (certified())
{
state(lock, s_must_replay);
}
else
case wsrep::client_state::m_local:
switch (state())
{
case s_executing:
// Voluntary rollback
if (is_streaming())
{
streaming_rollback();
}
state(lock, s_aborting);
break;
case s_must_abort:
if (certified())
{
state(lock, s_must_replay);
}
else
{
if (is_streaming())
{
streaming_rollback();
}
state(lock, s_aborting);
}
break;
case s_cert_failed:
if (is_streaming())
{
streaming_rollback();
}
state(lock, s_aborting);
break;
case s_aborting:
if (is_streaming())
{
provider().rollback(id_.get());
}
break;
case s_must_replay:
break;
default:
assert(0);
break;
}
break;
case s_cert_failed:
if (is_streaming())
{
streaming_rollback();
}
case wsrep::client_state::m_high_priority:
assert(state_ == s_executing);
state(lock, s_aborting);
break;
case s_aborting:
if (is_streaming())
{
provider().rollback(id_.get());
}
break;
case s_must_replay:
break;
default:
assert(0);
break;
@ -551,6 +578,11 @@ int wsrep::transaction::after_rollback()
assert(state() == s_aborting ||
state() == s_must_replay);
if (is_streaming())
{
clear_fragments();
}
if (state() == s_aborting)
{
state(lock, s_aborted);
@ -709,6 +741,16 @@ void wsrep::transaction::after_applying()
{
cleanup();
}
else
{
// State remains executing, so this is a streaming applier.
// Reset the meta data to avoid releasing commit order
// critical section above if the next fragment is rollback
// fragment. Rollback fragment ordering will be handled by
// another instance while removing the fragments from
// storage.
ws_meta_ = wsrep::ws_meta();
}
debug_log_state("after_applying leave");
}
@ -804,6 +846,13 @@ void wsrep::transaction::state(
<< " -> " << to_string(next_state);
}
assert(lock.owns_lock());
// BF aborter is allowed to change the state to must abort and
// further to aborting and aborted if the background rollbacker
// is launched.
assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() ||
next_state == s_must_abort ||
next_state == s_aborting ||
next_state == s_aborted);
static const char allowed[n_states][n_states] =
{ /* ex pr ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
@ -898,7 +947,7 @@ int wsrep::transaction::certify_fragment(
if (storage_service.append_fragment(
server_id,
id(),
flags_,
flags(),
wsrep::const_buffer(data.data(), data.size())))
{
lock.lock();
@ -911,7 +960,7 @@ int wsrep::transaction::certify_fragment(
enum wsrep::provider::status
cert_ret(provider().certify(client_state_.id().get(),
ws_handle_,
flags_,
flags(),
sr_ws_meta));
switch (cert_ret)
{
@ -933,8 +982,12 @@ int wsrep::transaction::certify_fragment(
ret = 1;
break;
}
provider().release(ws_handle_);
}
// Note: This does not release the handle in the provider
// since streaming is still on. However it is needed to
// make provider internal state to transition for the
// next fragment.
provider().release(ws_handle_);
lock.lock();
if (ret)
{
@ -943,7 +996,7 @@ int wsrep::transaction::certify_fragment(
else
{
state(lock, s_executing);
flags_ &= ~wsrep::provider::flag::start_transaction;
flags(flags() & ~wsrep::provider::flag::start_transaction);
}
return ret;
}
@ -1139,7 +1192,13 @@ int wsrep::transaction::append_sr_keys_for_commit()
void wsrep::transaction::streaming_rollback()
{
debug_log_state("streaming_rollback enter");
assert(streaming_context_.rolled_back() == false);
// Create a high priority applier which will handle the
// rollback fragment or clean up on configuration change.
// Adopt transaction will copy fragment set and appropriate
// meta data. Mark current transaction streaming context
// rolled back.
wsrep::high_priority_service* sa(
server_service_.streaming_applier_service(
client_state_.client_service()));
@ -1147,8 +1206,14 @@ void wsrep::transaction::streaming_rollback()
client_state_.server_state().id(), id(), sa);
sa->adopt_transaction(*this);
streaming_context_.cleanup();
// Replicate rollback fragment
provider().rollback(id_.get());
streaming_context_.rolled_back(id_);
enum wsrep::provider::status ret;
if ((ret = provider().rollback(id_.get())))
{
wsrep::log_warning() << "Failed to replicate rollback fragment for "
<< id_.get() << ": " << ret;
}
debug_log_state("streaming_rollback leave");
}
void wsrep::transaction::clear_fragments()
@ -1174,8 +1239,10 @@ void wsrep::transaction::cleanup()
bf_abort_provider_status_ = wsrep::provider::success;
bf_abort_client_state_ = 0;
ws_meta_ = wsrep::ws_meta();
flags_ = 0;
certified_ = false;
pa_unsafe_ = false;
streaming_context_.cleanup();
client_service_.cleanup_transaction();
debug_log_state("cleanup_leave");
}
@ -1185,11 +1252,30 @@ void wsrep::transaction::debug_log_state(
{
WSREP_TC_LOG_DEBUG(
1, context
<< "(" << client_state_.id().get()
<< "," << int64_t(id_.get())
<< "," << ws_meta_.seqno().get()
<< "," << wsrep::to_string(state_)
<< "," << wsrep::to_string(bf_abort_state_)
<< "," << wsrep::to_string(client_state_.current_error())
<< ")");
<< "\n server: " << client_state_.server_state().id()
<< ", client: " << client_state_.id().get()
<< ", state: " << wsrep::to_c_string(client_state_.state())
<< ", mode: " << wsrep::to_c_string(client_state_.mode())
<< "\n trx_id: " << int64_t(id_.get())
<< ", seqno: " << ws_meta_.seqno().get()
<< ", flags: " << flags()
<< "\n"
<< " state: " << wsrep::to_c_string(state_)
<< ", bfa_state: " << wsrep::to_c_string(bf_abort_state_)
<< ", error: " << wsrep::to_c_string(client_state_.current_error())
<< "\n"
<< " is_sr: " << is_streaming()
<< ", frags: " << streaming_context_.fragments_certified()
<< ", bytes: " << streaming_context_.bytes_certified()
<< ", sr_rb: " << streaming_context_.rolled_back()
<< "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
<< "");
}
void wsrep::transaction::debug_log_key_append(const wsrep::key& key)
{
WSREP_TC_LOG_DEBUG(2, "key_append"
<< "trx_id: "
<< int64_t(id().get())
<< " append key: " << key);
}

View File

@ -631,6 +631,12 @@ wsrep::wsrep_provider_v26::bf_abort(
return map_return_value(ret);
}
enum wsrep::provider::status
wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
{
return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
}
enum wsrep::provider::status
wsrep::wsrep_provider_v26::commit_order_enter(
const wsrep::ws_handle& ws_handle,

View File

@ -42,7 +42,7 @@ namespace wsrep
bf_abort(wsrep::seqno,
wsrep::transaction_id,
wsrep::seqno&);
int rollback(const wsrep::transaction_id) { ::abort(); return 0; }
enum wsrep::provider::status rollback(const wsrep::transaction_id);
enum wsrep::provider::status
commit_order_enter(const wsrep::ws_handle&,
const wsrep::ws_meta&);

View File

@ -35,11 +35,12 @@ int wsrep::mock_high_priority_service::apply_write_set(
return (fail_next_applying_ ? 1 : 0);
}
int wsrep::mock_high_priority_service::commit(const wsrep::ws_handle&,
const wsrep::ws_meta&)
int wsrep::mock_high_priority_service::commit(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
int ret(0);
client_state_->prepare_for_ordering(ws_handle, ws_meta, true);
if (client_state_->client_service().do_2pc())
{
ret = client_state_->before_prepare() ||

View File

@ -31,6 +31,8 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
const wsrep::transaction& transaction() const
{ return client_state_->transaction(); }
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&) WSREP_OVERRIDE;

View File

@ -131,11 +131,11 @@ namespace wsrep
enum wsrep::provider::status
append_data(wsrep::ws_handle&, const wsrep::const_buffer&)
{ return wsrep::provider::success; }
int rollback(const wsrep::transaction_id)
enum wsrep::provider::status rollback(const wsrep::transaction_id)
{
++fragments_;
++rollback_fragments_;
return 0;
return wsrep::provider::success;
}
enum wsrep::provider::status
commit_order_enter(const wsrep::ws_handle& ws_handle,