1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-17 13:22:00 +03:00

* Unit test for idle client BF abort.

* Fixes to seqno conversion between provider and provider library.
* Server context applying side fixes.
This commit is contained in:
Teemu Ollakka
2018-06-10 19:27:09 +03:00
parent 2619615e02
commit e18c9d597f
12 changed files with 137 additions and 50 deletions

View File

@ -210,6 +210,7 @@ namespace wsrep
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.start_transaction(); return transaction_.start_transaction();
} }
int start_transaction(const wsrep::transaction_id& id) int start_transaction(const wsrep::transaction_id& id)
{ {
assert(state_ == s_exec); assert(state_ == s_exec);
@ -265,12 +266,12 @@ namespace wsrep
} }
int before_rollback() int before_rollback()
{ {
assert(state_ == s_exec); assert(state_ == s_idle || state_ == s_exec || state_ == s_result);
return transaction_.before_rollback(); return transaction_.before_rollback();
} }
int after_rollback() int after_rollback()
{ {
assert(state_ == s_exec); assert(state_ == s_idle || state_ == s_exec || state_ == s_result);
return transaction_.after_rollback(); return transaction_.after_rollback();
} }
@ -319,6 +320,14 @@ namespace wsrep
* \return Client mode. * \return Client mode.
*/ */
enum mode mode() const { return mode_; } enum mode mode() const { return mode_; }
/*!
* Get Client state.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client state
*/
enum state state() const { return state_; }
const wsrep::transaction_context& transaction() const const wsrep::transaction_context& transaction() const
{ {
@ -369,20 +378,14 @@ namespace wsrep
* Friend declarations * Friend declarations
*/ */
friend int server_context::on_apply(client_context&, friend int server_context::on_apply(client_context&,
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::data&); const wsrep::data&);
friend class client_context_switch; friend class client_context_switch;
friend class client_applier_mode; friend class client_applier_mode;
friend class client_toi_mode; friend class client_toi_mode;
friend class transaction_context; friend class transaction_context;
/*!
* Get Client state.
*
* \todo Enforce mutex protection if called from other threads.
*
* \return Client state
*/
enum state state() const { return state_; }
/*! /*!
* Set client state. * Set client state.

View File

@ -69,6 +69,7 @@ namespace wsrep
{ {
return &mutex_; return &mutex_;
} }
private: private:
pthread_mutex_t mutex_; pthread_mutex_t mutex_;
}; };

View File

@ -295,7 +295,7 @@ namespace wsrep
const wsrep::gtid& gtid, const wsrep::gtid& gtid,
bool bypass) = 0; bool bypass) = 0;
virtual void background_rollback(wsrep::client_context&) = 0;
/*! /*!
* *
*/ */
@ -333,6 +333,8 @@ namespace wsrep
* \return Zero on success, non-zero on failure. * \return Zero on success, non-zero on failure.
*/ */
int on_apply(wsrep::client_context& client_context, int on_apply(wsrep::client_context& client_context,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::data& data); const wsrep::data& data);
/*! /*!

View File

@ -83,6 +83,7 @@ namespace wsrep
assert(ws_meta_.transaction_id() != transaction_id::invalid()); assert(ws_meta_.transaction_id() != transaction_id::invalid());
return start_transaction(ws_meta_.transaction_id()); return start_transaction(ws_meta_.transaction_id());
} }
int start_transaction(const wsrep::transaction_id& id); int start_transaction(const wsrep::transaction_id& id);
int start_transaction(const wsrep::ws_handle& ws_handle, int start_transaction(const wsrep::ws_handle& ws_handle,
@ -121,8 +122,8 @@ namespace wsrep
} }
wsrep::mutex& mutex(); wsrep::mutex& mutex();
wsrep::ws_handle& ws_handle() { return ws_handle_; } wsrep::ws_handle& ws_handle() { return ws_handle_; }
const wsrep::ws_handle& ws_handle() const { return ws_handle_; }
const wsrep::ws_meta& ws_meta() const { return ws_meta_; } const wsrep::ws_meta& ws_meta() const { return ws_meta_; }
private: private:
transaction_context(const transaction_context&); transaction_context(const transaction_context&);

View File

@ -35,6 +35,7 @@ int wsrep::client_context::before_command()
(transaction_.state() == wsrep::transaction_context::s_must_abort || (transaction_.state() == wsrep::transaction_context::s_must_abort ||
transaction_.state() == wsrep::transaction_context::s_aborted)) transaction_.state() == wsrep::transaction_context::s_aborted))
{ {
override_error(wsrep::e_deadlock_error);
return 1; return 1;
} }
return 0; return 0;
@ -67,13 +68,23 @@ void wsrep::client_context::after_command_after_result()
{ {
// Note: Error is not overridden here as the result has already // Note: Error is not overridden here as the result has already
// been sent to client. The error should be set in before_command() // been sent to client. The error should be set in before_command()
// when the client issues next command. // when the client issues next command and finds the transaction
// in aborted state.
lock.unlock(); lock.unlock();
rollback(); rollback();
transaction_.after_statement(); transaction_.after_statement();
lock.lock(); lock.lock();
assert(transaction_.state() == wsrep::transaction_context::s_aborted); assert(transaction_.state() == wsrep::transaction_context::s_aborted);
assert(current_error() != wsrep::e_success); assert(current_error() == wsrep::e_success);
}
else if (transaction_.active() &&
transaction_.state() == wsrep::transaction_context::s_aborted)
{
// Will clean up the transaction
lock.unlock();
(void)transaction_.after_statement();
lock.lock();
current_error_ = wsrep::e_success;
} }
state(lock, s_idle); state(lock, s_idle);
} }

View File

@ -253,7 +253,12 @@ public:
{ {
simulator_.donate_sst(*this, req, gtid, bypass); simulator_.donate_sst(*this, req, gtid, bypass);
} }
void background_rollback(wsrep::client_context& cs) override
{
assert(0);
cs.before_rollback();
cs.after_rollback();
}
// Client context management // Client context management
wsrep::client_context* local_client_context(); wsrep::client_context* local_client_context();

View File

@ -43,6 +43,12 @@ namespace wsrep
void on_sst_request(const std::string&, void on_sst_request(const std::string&,
const wsrep::gtid&, const wsrep::gtid&,
bool) WSREP_OVERRIDE { } bool) WSREP_OVERRIDE { }
void background_rollback(wsrep::client_context& client_context)
WSREP_OVERRIDE
{
client_context.before_rollback();
client_context.after_rollback();
}
// void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { } // void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { }
// void on_apply(wsrep::transaction_context&) { } // void on_apply(wsrep::transaction_context&) { }
// void on_commit(wsrep::transaction_context&) { } // void on_commit(wsrep::transaction_context&) { }

View File

@ -152,15 +152,9 @@ namespace
meta->stid.trx, meta->stid.trx,
meta->stid.conn), meta->depends_on, meta->stid.conn), meta->depends_on,
map_flags_from_native(flags)); map_flags_from_native(flags));
if (client_context->transaction().state() !=
wsrep::transaction_context::s_replaying &&
client_context->start_transaction(ws_handle, ws_meta))
{
ret = WSREP_CB_FAILURE;
}
if (ret == WSREP_CB_SUCCESS && if (ret == WSREP_CB_SUCCESS &&
client_context->server_context().on_apply( client_context->server_context().on_apply(
*client_context, data)) *client_context, ws_handle, ws_meta, data))
{ {
ret = WSREP_CB_FAILURE; ret = WSREP_CB_FAILURE;
} }
@ -337,24 +331,27 @@ void wsrep::server_context::on_sync()
int wsrep::server_context::on_apply( int wsrep::server_context::on_apply(
wsrep::client_context& client_context, wsrep::client_context& client_context,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::data& data) const wsrep::data& data)
{ {
int ret(0); int ret(0);
const wsrep::transaction_context& txc(client_context.transaction()); const wsrep::transaction_context& txc(client_context.transaction());
const wsrep::ws_meta& ws_meta(txc.ws_meta());
// wsrep::log_debug() << "server_context::on apply flags: " // wsrep::log_debug() << "server_context::on apply flags: "
// << flags_to_string(ws_meta.flags()); // << flags_to_string(ws_meta.flags());
assert(ws_handle.opaque());
assert(ws_meta.flags()); assert(ws_meta.flags());
bool not_replaying(txc.state() !=
wsrep::transaction_context::s_replaying);
if (starts_transaction(ws_meta) && commits_transaction(ws_meta)) if (starts_transaction(ws_meta) && commits_transaction(ws_meta))
{ {
bool not_replaying(txc.state() !=
wsrep::transaction_context::s_replaying);
if (not_replaying) if (not_replaying)
{ {
client_context.before_command(); client_context.before_command();
client_context.before_statement(); client_context.before_statement();
assert(txc.active() == false); assert(txc.active() == false);
client_context.start_transaction(); client_context.start_transaction(ws_handle, ws_meta);
} }
if (client_context.apply(data)) if (client_context.apply(data))
{ {
@ -384,8 +381,10 @@ int wsrep::server_context::on_apply(
// SR not implemented yet // SR not implemented yet
assert(0); assert(0);
} }
if (not_replaying)
assert(txc.active() == false); {
assert(txc.active() == false);
}
return ret; return ret;
} }

View File

@ -18,14 +18,23 @@ namespace
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_context::m_applier, wsrep::client_context::m_applier,
false) false)
, ws_handle(1, (void*)1)
, ws_meta(wsrep::gtid(wsrep::id("1"), 1),
wsrep::stid(wsrep::id("1"), 1, 1),
0,
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit)
{ {
wsrep_mock::start_applying_transaction( // wsrep_mock::start_applying_transaction(
cc, 1, 1, // cc, 1, 1,
wsrep::provider::flag::start_transaction | // wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit); // wsrep::provider::flag::commit);
cc.start_transaction(ws_handle, ws_meta);
} }
wsrep::mock_server_context sc; wsrep::mock_server_context sc;
wsrep::mock_client_context cc; wsrep::mock_client_context cc;
wsrep::ws_handle ws_handle;
wsrep::ws_meta ws_meta;
}; };
} }
@ -35,7 +44,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc,
{ {
cc.debug_log_level(1); cc.debug_log_level(1);
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0); BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
// ::abort(); // ::abort();
BOOST_REQUIRE_MESSAGE( BOOST_REQUIRE_MESSAGE(
@ -48,7 +58,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc,
applying_server_fixture) applying_server_fixture)
{ {
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 0); BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 0);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_committed);
} }
@ -60,7 +71,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback,
{ {
cc.fail_next_applying(true); cc.fail_next_applying(true);
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1); BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 1);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted);
} }
@ -72,7 +84,8 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback,
{ {
cc.fail_next_applying(true); cc.fail_next_applying(true);
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, wsrep::data(buf, 1)) == 1); BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::data(buf, 1)) == 1);
const wsrep::transaction_context& txc(cc.transaction()); const wsrep::transaction_context& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction_context::s_aborted);
} }

View File

@ -336,6 +336,7 @@ int wsrep::transaction_context::before_rollback()
debug_log_state("before_rollback_enter"); debug_log_state("before_rollback_enter");
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_must_abort || state() == s_must_abort ||
state() == s_aborting || // Background rollbacker
state() == s_cert_failed || state() == s_cert_failed ||
state() == s_must_replay); state() == s_must_replay);
@ -373,6 +374,12 @@ int wsrep::transaction_context::before_rollback()
} }
state(lock, s_aborting); state(lock, s_aborting);
break; break;
case s_aborting:
if (is_streaming())
{
provider_.rollback(id_.get());
}
break;
case s_must_replay: case s_must_replay:
break; break;
default: default:
@ -538,11 +545,18 @@ bool wsrep::transaction_context::bf_abort(
if (ret) if (ret)
{ {
bf_abort_client_state_ = client_context_.state(); bf_abort_client_state_ = client_context_.state();
if (client_context_.server_context().rollback_mode() == if (client_context_.state() == wsrep::client_context::s_idle &&
client_context_.server_context().rollback_mode() ==
wsrep::server_context::rm_sync) wsrep::server_context::rm_sync)
{ {
//! \todo Launch background rollbacker. // We need to change the state to aborting under the
assert(0); // lock protection to avoid a race between client thread,
// otherwise it could happend that the client gains control
// between releasing the lock and before background
// rollbacker gets control.
state(lock, wsrep::transaction_context::s_aborting);
lock.unlock();
client_context_.server_context().background_rollback(client_context_);
} }
} }
return ret; return ret;

View File

@ -17,7 +17,7 @@ namespace
struct replicating_client_fixture struct replicating_client_fixture
{ {
replicating_client_fixture() replicating_client_fixture()
: sc("s1", "s1", wsrep::server_context::rm_async) : sc("s1", "s1", wsrep::server_context::rm_sync)
, cc(sc, wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_context::m_replicating) wsrep::client_context::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
@ -361,6 +361,28 @@ BOOST_FIXTURE_TEST_CASE(
BOOST_REQUIRE(cc.current_error() == wsrep::e_success); BOOST_REQUIRE(cc.current_error() == wsrep::e_success);
} }
BOOST_FIXTURE_TEST_CASE(transaction_context_1pc_bf_abort_idle_client,
replicating_client_fixture)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
cc.after_statement();
BOOST_REQUIRE(cc.state() == wsrep::client_context::s_exec);
cc.after_command_before_result();
BOOST_REQUIRE(cc.state() == wsrep::client_context::s_result);
cc.after_command_after_result();
BOOST_REQUIRE(cc.state() == wsrep::client_context::s_idle);
wsrep_mock::bf_abort_unordered(cc);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_aborted);
BOOST_REQUIRE(cc.before_command() == 1);
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
cc.after_command_before_result();
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
cc.after_command_after_result();
BOOST_REQUIRE(cc.current_error() == wsrep::e_success);
BOOST_REQUIRE(tc.active() == false);
}
BOOST_FIXTURE_TEST_CASE(transaction_context_1pc_applying, BOOST_FIXTURE_TEST_CASE(transaction_context_1pc_applying,
applying_client_fixture) applying_client_fixture)
{ {

View File

@ -59,6 +59,15 @@ namespace
throw wsrep::runtime_error("Invalid key type"); throw wsrep::runtime_error("Invalid key type");
} }
static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
{
return (seqno.nil() ? WSREP_SEQNO_UNDEFINED : seqno.get());
}
static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
{
return (seqno == WSREP_SEQNO_UNDEFINED ? 0 : seqno);
}
inline uint32_t map_one(const int flags, const int from, inline uint32_t map_one(const int flags, const int from,
const uint32_t to) const uint32_t to)
{ {
@ -138,7 +147,7 @@ namespace
mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags) mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
: ws_meta_(ws_meta) : ws_meta_(ws_meta)
, trx_meta_() , trx_meta_()
, flags_(map_flags_to_native(flags)) , flags_(flags)
{ } { }
~mutable_ws_meta() ~mutable_ws_meta()
@ -147,12 +156,12 @@ namespace
wsrep::gtid( wsrep::gtid(
wsrep::id(trx_meta_.gtid.uuid.data, wsrep::id(trx_meta_.gtid.uuid.data,
sizeof(trx_meta_.gtid.uuid.data)), sizeof(trx_meta_.gtid.uuid.data)),
trx_meta_.gtid.seqno), seqno_from_native(trx_meta_.gtid.seqno)),
wsrep::stid(wsrep::id(trx_meta_.stid.node.data, wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
sizeof(trx_meta_.stid.node.data)), sizeof(trx_meta_.stid.node.data)),
trx_meta_.stid.trx, trx_meta_.stid.trx,
trx_meta_.stid.conn), trx_meta_.stid.conn),
trx_meta_.depends_on, flags_); seqno_from_native(trx_meta_.depends_on), flags_);
} }
wsrep_trx_meta* native() { return &trx_meta_; } wsrep_trx_meta* native() { return &trx_meta_; }
@ -172,12 +181,12 @@ namespace
{ {
std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(), std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
sizeof(trx_meta_.gtid.uuid.data)); sizeof(trx_meta_.gtid.uuid.data));
trx_meta_.gtid.seqno = ws_meta.seqno().get(); trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(), std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
sizeof(trx_meta_.stid.node.data)); sizeof(trx_meta_.stid.node.data));
trx_meta_.stid.conn = ws_meta.client_id().get(); trx_meta_.stid.conn = ws_meta.client_id().get();
trx_meta_.stid.trx = ws_meta.transaction_id().get(); trx_meta_.stid.trx = ws_meta.transaction_id().get();
trx_meta_.depends_on = ws_meta.depends_on().get(); trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
} }
~const_ws_meta() ~const_ws_meta()
@ -286,9 +295,9 @@ int wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
enum wsrep::provider::status enum wsrep::provider::status
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id, wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle, wsrep::ws_handle& ws_handle,
int flags, int flags,
wsrep::ws_meta& ws_meta) wsrep::ws_meta& ws_meta)
{ {
mutable_ws_handle mwsh(ws_handle); mutable_ws_handle mwsh(ws_handle);
mutable_ws_meta mmeta(ws_meta, flags); mutable_ws_meta mmeta(ws_meta, flags);
@ -307,8 +316,9 @@ wsrep::wsrep_provider_v26::bf_abort(
wsrep_seqno_t wsrep_victim_seqno; wsrep_seqno_t wsrep_victim_seqno;
wsrep_status_t ret( wsrep_status_t ret(
wsrep_->abort_certification( wsrep_->abort_certification(
wsrep_, bf_seqno.get(), victim_id.get(), &wsrep_victim_seqno)); wsrep_, seqno_to_native(bf_seqno),
victim_seqno = wsrep_victim_seqno; victim_id.get(), &wsrep_victim_seqno));
victim_seqno = seqno_from_native(wsrep_victim_seqno);
return map_return_value(ret); return map_return_value(ret);
} }