1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Removing client_state dependency from client_service.

This commit is contained in:
Teemu Ollakka
2018-06-18 10:21:02 +03:00
parent af3119a58b
commit 03043d3f25
12 changed files with 159 additions and 175 deletions

View File

@ -5,8 +5,7 @@
#include "db_client_service.hpp" #include "db_client_service.hpp"
#include "db_client.hpp" #include "db_client.hpp"
int db::client_service::apply(wsrep::client_state&, int db::client_service::apply(const wsrep::const_buffer&)
const wsrep::const_buffer&)
{ {
db::client* client(client_state_.client()); db::client* client(client_state_.client());
client->se_trx_.start(client); client->se_trx_.start(client);
@ -14,8 +13,7 @@ int db::client_service::apply(wsrep::client_state&,
return 0; return 0;
} }
int db::client_service::commit(wsrep::client_state&, int db::client_service::commit(const wsrep::ws_handle&,
const wsrep::ws_handle&,
const wsrep::ws_meta&) const wsrep::ws_meta&)
{ {
db::client* client(client_state_.client()); db::client* client(client_state_.client());
@ -26,7 +24,7 @@ int db::client_service::commit(wsrep::client_state&,
return ret; return ret;
} }
int db::client_service::rollback(wsrep::client_state&) int db::client_service::rollback()
{ {
db::client* client(client_state_.client()); db::client* client(client_state_.client());
int ret(client_state_.before_rollback()); int ret(client_state_.before_rollback());
@ -37,11 +35,11 @@ int db::client_service::rollback(wsrep::client_state&)
return ret; return ret;
} }
enum wsrep::provider::status enum wsrep::provider::status
db::client_service::replay(wsrep::client_state&, db::client_service::replay()
wsrep::transaction& transaction)
{ {
wsrep::high_priority_context high_priority_context(client_state_); wsrep::high_priority_context high_priority_context(client_state_);
auto ret(provider_.replay(transaction.ws_handle(), auto ret(provider_.replay(const_cast<wsrep::ws_handle&>(
client_state_.transaction().ws_handle()),
&client_state_)); &client_state_));
if (ret == wsrep::provider::success) if (ret == wsrep::provider::success)
{ {

View File

@ -46,7 +46,7 @@ namespace db
client_state_.store_globals(); client_state_.store_globals();
} }
int prepare_data_for_replication(wsrep::client_state&, const wsrep::transaction&) override int prepare_data_for_replication() override
{ {
return 0; return 0;
} }
@ -56,30 +56,25 @@ namespace db
return 0; return 0;
} }
int prepare_fragment_for_replication(wsrep::client_state&, int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
const wsrep::transaction&,
wsrep::mutable_buffer&) override
{ {
return 0; return 0;
} }
void remove_fragments(const wsrep::transaction&) override void remove_fragments() override
{ } { }
int apply(wsrep::client_state&, const wsrep::const_buffer&) override; int apply(const wsrep::const_buffer&) override;
int commit(wsrep::client_state&, int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(wsrep::client_state&) override; int rollback() override;
void will_replay(const wsrep::transaction&) override void will_replay() override
{ } { }
void wait_for_replayers(wsrep::client_state&, void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override { }
wsrep::unique_lock<wsrep::mutex>&) override { } enum wsrep::provider::status replay()
enum wsrep::provider::status replay(wsrep::client_state&,
wsrep::transaction&)
override; override;
int append_fragment(const wsrep::transaction&, int, int append_fragment(const wsrep::transaction&, int,
@ -89,7 +84,7 @@ namespace db
} }
void emergency_shutdown() { ::abort(); } void emergency_shutdown() { ::abort(); }
void debug_sync(wsrep::client_state&, const char*) override { } void debug_sync(const char*) override { }
void debug_crash(const char*) override { } void debug_crash(const char*) override { }
private: private:
db::client_state& client_state_; db::client_state& client_state_;

View File

@ -22,7 +22,6 @@
namespace wsrep namespace wsrep
{ {
class transaction; class transaction;
class client_state;
class client_service class client_service
{ {
public: public:
@ -60,15 +59,14 @@ namespace wsrep
/** /**
* Set up a data for replication. * Set up a data for replication.
*/ */
virtual int prepare_data_for_replication(wsrep::client_state&, const wsrep::transaction&) = 0; virtual int prepare_data_for_replication() = 0;
// //
// Streaming // Streaming
// //
virtual size_t bytes_generated() const = 0; virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication( virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
wsrep::client_state&, const wsrep::transaction&, wsrep::mutable_buffer&) = 0; virtual void remove_fragments() = 0;
virtual void remove_fragments(const wsrep::transaction&) = 0;
// //
// Applying interface // Applying interface
@ -77,17 +75,17 @@ namespace wsrep
/** /**
* Apply a write set. * Apply a write set.
*/ */
virtual int apply(wsrep::client_state&, const wsrep::const_buffer&) = 0; virtual int apply(const wsrep::const_buffer&) = 0;
/** /**
* Commit transaction. * Commit transaction.
*/ */
virtual int commit(wsrep::client_state&, const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
/** /**
* Roll back transaction. * Roll back transaction.
*/ */
virtual int rollback(wsrep::client_state&) = 0; virtual int rollback() = 0;
// //
// Interface to global server state // Interface to global server state
@ -109,7 +107,7 @@ namespace wsrep
* @todo This should not be visible to DBMS level, should be * @todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib. * handled internally by wsrep-lib.
*/ */
virtual void will_replay(const wsrep::transaction&) = 0; virtual void will_replay() = 0;
/** /**
* Replay the current transaction. The implementation must put * Replay the current transaction. The implementation must put
@ -119,9 +117,7 @@ namespace wsrep
* @todo This should not be visible to DBMS level, should be * @todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib. * handled internally by wsrep-lib.
*/ */
virtual enum wsrep::provider::status replay( virtual enum wsrep::provider::status replay() = 0;
wsrep::client_state&,
wsrep::transaction&) = 0;
/** /**
* Wait until all replaying transactions have been finished * Wait until all replaying transactions have been finished
@ -130,7 +126,7 @@ namespace wsrep
* @todo This should not be visible to DBMS level, should be * @todo This should not be visible to DBMS level, should be
* handled internally by wsrep-lib. * handled internally by wsrep-lib.
*/ */
virtual void wait_for_replayers(wsrep::client_state&, wsrep::unique_lock<wsrep::mutex>&) = 0; virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0;
// Streaming replication // Streaming replication
/** /**
@ -146,7 +142,7 @@ namespace wsrep
* *
* @params sync_point Name of the debug sync point. * @params sync_point Name of the debug sync point.
*/ */
virtual void debug_sync(wsrep::client_state&, const char* sync_point) = 0; virtual void debug_sync(const char* sync_point) = 0;
/** /**
* Forcefully kill the process if the crash_point has * Forcefully kill the process if the crash_point has

View File

@ -242,9 +242,9 @@ namespace wsrep
return transaction_.append_data(data); return transaction_.append_data(data);
} }
int prepare_data_for_replication(const wsrep::transaction& tc) int prepare_data_for_replication()
{ {
return client_service_.prepare_data_for_replication(*this, tc); return client_service_.prepare_data_for_replication();
} }
/** @} */ /** @} */
@ -286,12 +286,9 @@ namespace wsrep
} }
/** @todo deprecate */ /** @todo deprecate */
int prepare_fragment_for_replication( int prepare_fragment_for_replication(wsrep::mutable_buffer& mb)
const wsrep::transaction& tc,
wsrep::mutable_buffer& mb)
{ {
return client_service_.prepare_fragment_for_replication( return client_service_.prepare_fragment_for_replication(mb);
*this, tc, mb);
} }
/** @todo deprecate */ /** @todo deprecate */
@ -310,7 +307,7 @@ namespace wsrep
*/ */
void remove_fragments() void remove_fragments()
{ {
client_service_.remove_fragments(transaction_); client_service_.remove_fragments();
} }
/** @} */ /** @} */
@ -323,17 +320,17 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta); return transaction_.start_transaction(wsh, meta);
} }
/** @todo deprecate */
int apply(const wsrep::const_buffer& data) int apply(const wsrep::const_buffer& data)
{ {
assert(mode_ == m_high_priority); assert(mode_ == m_high_priority);
return client_service_.apply(*this, data); return client_service_.apply(data);
} }
int commit() int commit()
{ {
assert(mode_ == m_high_priority || mode_ == m_local); assert(mode_ == m_high_priority || mode_ == m_local);
return client_service_.commit( return client_service_.commit(
*this,
transaction_.ws_handle(), transaction_.ws_meta()); transaction_.ws_handle(), transaction_.ws_meta());
} }
/** @} */ /** @} */
@ -377,7 +374,7 @@ namespace wsrep
/** @{ */ /** @{ */
int rollback() int rollback()
{ {
return client_service_.rollback(*this); return client_service_.rollback();
} }
int before_rollback() int before_rollback()
@ -419,23 +416,25 @@ namespace wsrep
transaction_.streaming_context_ = transaction.streaming_context_; transaction_.streaming_context_ = transaction.streaming_context_;
} }
/** @todo deprecate */
enum wsrep::provider::status replay( enum wsrep::provider::status replay(
wsrep::transaction& tc) wsrep::transaction&)
{ {
return client_service_.replay(*this, tc); return client_service_.replay();
} }
// //
// //
// //
void will_replay(const wsrep::transaction& tc) void will_replay(const wsrep::transaction&)
{ {
client_service_.will_replay(tc); client_service_.will_replay();
} }
/** @todo deprecate */
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock) void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
{ {
client_service_.wait_for_replayers(*this, lock); client_service_.wait_for_replayers(lock);
} }
bool interrupted() const bool interrupted() const
@ -503,7 +502,7 @@ namespace wsrep
// //
void debug_sync(const char* sync_point) void debug_sync(const char* sync_point)
{ {
client_service_.debug_sync(*this, sync_point); client_service_.debug_sync(sync_point);
} }
void debug_crash(const char* crash_point) void debug_crash(const char* crash_point)

View File

@ -58,7 +58,7 @@ int wsrep::client_state::before_command()
wsrep::server_state::rm_async); wsrep::server_state::rm_async);
override_error(wsrep::e_deadlock_error); override_error(wsrep::e_deadlock_error);
lock.unlock(); lock.unlock();
client_service_.rollback(*this); client_service_.rollback();
(void)transaction_.after_statement(); (void)transaction_.after_statement();
lock.lock(); lock.lock();
assert(transaction_.state() == assert(transaction_.state() ==
@ -96,7 +96,7 @@ void wsrep::client_state::after_command_before_result()
{ {
override_error(wsrep::e_deadlock_error); override_error(wsrep::e_deadlock_error);
lock.unlock(); lock.unlock();
client_service_.rollback(*this); client_service_.rollback();
(void)transaction_.after_statement(); (void)transaction_.after_statement();
lock.lock(); lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted); assert(transaction_.state() == wsrep::transaction::s_aborted);
@ -116,7 +116,7 @@ void wsrep::client_state::after_command_after_result()
transaction_.state() == wsrep::transaction::s_must_abort) transaction_.state() == wsrep::transaction::s_must_abort)
{ {
lock.unlock(); lock.unlock();
client_service_.rollback(*this); client_service_.rollback();
lock.lock(); lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted); assert(transaction_.state() == wsrep::transaction::s_aborted);
override_error(wsrep::e_deadlock_error); override_error(wsrep::e_deadlock_error);

View File

@ -709,7 +709,7 @@ int wsrep::transaction::certify_fragment(
lock.unlock(); lock.unlock();
wsrep::mutable_buffer data; wsrep::mutable_buffer data;
if (client_state_.prepare_fragment_for_replication(*this, data)) if (client_state_.prepare_fragment_for_replication(data))
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
@ -807,7 +807,7 @@ int wsrep::transaction::certify_commit(
lock.unlock(); lock.unlock();
if (client_state_.prepare_data_for_replication(*this)) if (client_state_.prepare_data_for_replication())
{ {
// Note: Error must be set by prepare_data_for_replication() // Note: Error must be set by prepare_data_for_replication()
lock.lock(); lock.lock();

View File

@ -17,7 +17,7 @@ namespace
{ {
replicating_client_fixture_sync_rm() replicating_client_fixture_sync_rm()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
@ -28,7 +28,7 @@ namespace
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -36,7 +36,7 @@ namespace
{ {
replicating_client_fixture_async_rm() replicating_client_fixture_async_rm()
: sc("s1", "s1", wsrep::server_state::rm_async) : sc("s1", "s1", wsrep::server_state::rm_async)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
@ -47,7 +47,7 @@ namespace
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -55,11 +55,11 @@ namespace
{ {
replicating_client_fixture_2pc() replicating_client_fixture_2pc()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.client_service().do_2pc_ = true; cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state // Verify initial state
@ -67,7 +67,7 @@ namespace
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -75,11 +75,11 @@ namespace
{ {
replicating_client_fixture_autocommit() replicating_client_fixture_autocommit()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.client_service().is_autocommit_ = true; cc.is_autocommit_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state // Verify initial state
@ -87,7 +87,7 @@ namespace
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -96,7 +96,7 @@ namespace
applying_client_fixture() applying_client_fixture()
: sc("s1", "s1", : sc("s1", "s1",
wsrep::server_state::rm_async) wsrep::server_state::rm_async)
, cc(sc, sc.client_service(), , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, tc(cc.transaction()) , tc(cc.transaction())
@ -115,7 +115,7 @@ namespace
BOOST_REQUIRE(tc.ordered() == true); BOOST_REQUIRE(tc.ordered() == true);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -124,12 +124,12 @@ namespace
applying_client_fixture_2pc() applying_client_fixture_2pc()
: sc("s1", "s1", : sc("s1", "s1",
wsrep::server_state::rm_async) wsrep::server_state::rm_async)
, cc(sc, sc.client_service(), , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.client_service().do_2pc_ = true; cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
wsrep::ws_handle ws_handle(1, (void*)1); wsrep::ws_handle ws_handle(1, (void*)1);
@ -144,7 +144,7 @@ namespace
BOOST_REQUIRE(tc.ordered() == true); BOOST_REQUIRE(tc.ordered() == true);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -152,7 +152,8 @@ namespace
{ {
streaming_client_fixture_row() streaming_client_fixture_row()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
@ -164,7 +165,7 @@ namespace
cc.enable_streaming(wsrep::streaming_context::row, 1); cc.enable_streaming(wsrep::streaming_context::row, 1);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -172,7 +173,8 @@ namespace
{ {
streaming_client_fixture_byte() streaming_client_fixture_byte()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
@ -184,7 +186,7 @@ namespace
cc.enable_streaming(wsrep::streaming_context::bytes, 1); cc.enable_streaming(wsrep::streaming_context::bytes, 1);
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
@ -192,7 +194,8 @@ namespace
{ {
streaming_client_fixture_statement() streaming_client_fixture_statement()
: sc("s1", "s1", wsrep::server_state::rm_sync) : sc("s1", "s1", wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), wsrep::client_id(1), , cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_replicating) wsrep::client_state::m_replicating)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
@ -205,7 +208,7 @@ namespace
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
const wsrep::transaction& tc; const wsrep::transaction& tc;
}; };
} }

View File

@ -7,48 +7,47 @@
int wsrep::mock_client_service::apply( int wsrep::mock_client_service::apply(
wsrep::client_state& client_state WSREP_UNUSED,
const wsrep::const_buffer&) const wsrep::const_buffer&)
{ {
assert(client_state.transaction().state() == wsrep::transaction::s_executing || assert(client_state_.transaction().state() == wsrep::transaction::s_executing ||
client_state.transaction().state() == wsrep::transaction::s_replaying); client_state_.transaction().state() == wsrep::transaction::s_replaying);
return (fail_next_applying_ ? 1 : 0); return (fail_next_applying_ ? 1 : 0);
} }
int wsrep::mock_client_service::commit(wsrep::client_state& client_state, const wsrep::ws_handle&, const wsrep::ws_meta&) int wsrep::mock_client_service::commit(
const wsrep::ws_handle&, const wsrep::ws_meta&)
{ {
int ret(0); int ret(0);
if (do_2pc()) if (do_2pc())
{ {
if (client_state.before_prepare()) if (client_state_.before_prepare())
{ {
ret = 1; ret = 1;
} }
else if (client_state.after_prepare()) else if (client_state_.after_prepare())
{ {
ret = 1; ret = 1;
} }
} }
if (ret == 0 && if (ret == 0 &&
(client_state.before_commit() || (client_state_.before_commit() ||
client_state.ordered_commit() || client_state_.ordered_commit() ||
client_state.after_commit())) client_state_.after_commit()))
{ {
ret = 1; ret = 1;
} }
return ret; return ret;
} }
int wsrep::mock_client_service::rollback( int wsrep::mock_client_service::rollback()
wsrep::client_state& client_state)
{ {
int ret(0); int ret(0);
if (client_state.before_rollback()) if (client_state_.before_rollback())
{ {
ret = 1; ret = 1;
} }
else if (client_state.after_rollback()) else if (client_state_.after_rollback())
{ {
ret = 1; ret = 1;
} }

View File

@ -42,8 +42,8 @@ namespace wsrep
class mock_client_service : public wsrep::client_service class mock_client_service : public wsrep::client_service
{ {
public: public:
mock_client_service(wsrep::provider& provider) mock_client_service(wsrep::mock_client_state& client_state)
: wsrep::client_service(provider) : wsrep::client_service(client_state.provider())
, is_autocommit_() , is_autocommit_()
, do_2pc_() , do_2pc_()
, fail_next_applying_() , fail_next_applying_()
@ -53,16 +53,17 @@ namespace wsrep
, sync_point_enabled_() , sync_point_enabled_()
, sync_point_action_() , sync_point_action_()
, bytes_generated_() , bytes_generated_()
, client_state_(client_state)
, replays_() , replays_()
, aborts_() , aborts_()
{ } { }
int apply(wsrep::client_state&, const wsrep::const_buffer&) WSREP_OVERRIDE; int apply(const wsrep::const_buffer&) WSREP_OVERRIDE;
int commit(wsrep::client_state&, const wsrep::ws_handle&, const wsrep::ws_meta&) int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE; WSREP_OVERRIDE;
int rollback(wsrep::client_state&) WSREP_OVERRIDE; int rollback() WSREP_OVERRIDE;
bool is_autocommit() const WSREP_OVERRIDE bool is_autocommit() const WSREP_OVERRIDE
{ return is_autocommit_; } { return is_autocommit_; }
@ -77,39 +78,37 @@ namespace wsrep
void emergency_shutdown() WSREP_OVERRIDE { ++aborts_; } void emergency_shutdown() WSREP_OVERRIDE { ++aborts_; }
int append_fragment(const wsrep::transaction&, int append_fragment(const wsrep::transaction&,
int, const wsrep::const_buffer&) WSREP_OVERRIDE int,
const wsrep::const_buffer&) WSREP_OVERRIDE
{ return 0; } { return 0; }
void remove_fragments(const wsrep::transaction& ) void remove_fragments()
WSREP_OVERRIDE { }
void will_replay(const wsrep::transaction&)
WSREP_OVERRIDE { } WSREP_OVERRIDE { }
void will_replay() WSREP_OVERRIDE { }
enum wsrep::provider::status enum wsrep::provider::status
replay(wsrep::client_state& client_state, replay() WSREP_OVERRIDE
wsrep::transaction& tc) WSREP_OVERRIDE
{ {
enum wsrep::provider::status ret( enum wsrep::provider::status ret(
provider_.replay(tc.ws_handle(), &client_state)); provider_.replay(const_cast<wsrep::ws_handle&>(
client_state_.transaction().ws_handle()),
&client_state_));
++replays_; ++replays_;
return ret; return ret;
} }
void wait_for_replayers( void wait_for_replayers(
wsrep::client_state& client_state,
wsrep::unique_lock<wsrep::mutex>& lock) wsrep::unique_lock<wsrep::mutex>& lock)
WSREP_OVERRIDE WSREP_OVERRIDE
{ {
lock.unlock(); lock.unlock();
if (bf_abort_during_wait_) if (bf_abort_during_wait_)
{ {
wsrep_test::bf_abort_unordered(client_state); wsrep_test::bf_abort_unordered(client_state_);
} }
lock.lock(); lock.lock();
} }
int prepare_data_for_replication( int prepare_data_for_replication() WSREP_OVERRIDE
wsrep::client_state& client_state,
const wsrep::transaction&) WSREP_OVERRIDE
{ {
if (error_during_prepare_data_) if (error_during_prepare_data_)
{ {
@ -117,7 +116,7 @@ namespace wsrep
} }
static const char buf[1] = { 1 }; static const char buf[1] = { 1 };
wsrep::const_buffer data = wsrep::const_buffer(buf, 1); wsrep::const_buffer data = wsrep::const_buffer(buf, 1);
return client_state.append_data(data); return client_state_.append_data(data);
} }
size_t bytes_generated() const size_t bytes_generated() const
@ -125,10 +124,7 @@ namespace wsrep
return bytes_generated_; return bytes_generated_;
} }
int prepare_fragment_for_replication( int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer)
wsrep::client_state& client_state,
const wsrep::transaction&,
wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE WSREP_OVERRIDE
{ {
if (error_during_prepare_data_) if (error_during_prepare_data_)
@ -138,23 +134,22 @@ namespace wsrep
static const char buf[1] = { 1 }; static const char buf[1] = { 1 };
buffer.push_back(&buf[0], &buf[1]); buffer.push_back(&buf[0], &buf[1]);
wsrep::const_buffer data(buffer.data(), buffer.size()); wsrep::const_buffer data(buffer.data(), buffer.size());
return client_state.append_data(data); return client_state_.append_data(data);
} }
void store_globals() WSREP_OVERRIDE { } void store_globals() WSREP_OVERRIDE { }
void debug_sync(wsrep::client_state& client_state, void debug_sync(const char* sync_point) WSREP_OVERRIDE
const char* sync_point) WSREP_OVERRIDE
{ {
if (sync_point_enabled_ == sync_point) if (sync_point_enabled_ == sync_point)
{ {
switch (sync_point_action_) switch (sync_point_action_)
{ {
case spa_bf_abort_unordered: case spa_bf_abort_unordered:
wsrep_test::bf_abort_unordered(client_state); wsrep_test::bf_abort_unordered(client_state_);
break; break;
case spa_bf_abort_ordered: case spa_bf_abort_ordered:
wsrep_test::bf_abort_ordered(client_state); wsrep_test::bf_abort_ordered(client_state_);
break; break;
} }
} }
@ -190,10 +185,23 @@ namespace wsrep
size_t aborts() const { return aborts_; } size_t aborts() const { return aborts_; }
private: private:
wsrep::mock_client_state& client_state_;
size_t replays_; size_t replays_;
size_t aborts_; size_t aborts_;
}; };
class mock_client
: public mock_client_state
, public mock_client_service
{
public:
mock_client(wsrep::server_state& server_state,
const wsrep::client_id& id,
enum wsrep::client_state::mode mode)
: mock_client_state(server_state, *this, id, mode)
, mock_client_service(static_cast<mock_client_state&>(*this))
{ }
};
} }
#endif // WSREP_MOCK_CLIENT_CONTEXT_HPP #endif // WSREP_MOCK_CLIENT_CONTEXT_HPP

View File

@ -24,21 +24,20 @@ namespace wsrep
, mutex_() , mutex_()
, cond_() , cond_()
, provider_(*this) , provider_(*this)
, client_service_(provider_)
, last_client_id_(0) , last_client_id_(0)
{ } { }
wsrep::mock_provider& provider() const wsrep::mock_provider& provider() const
{ return provider_; } { return provider_; }
wsrep::client_state* local_client_state() wsrep::client_state* local_client_state()
{ {
return new wsrep::mock_client_state( return new wsrep::mock_client(
*this, client_service_, ++last_client_id_, *this, ++last_client_id_,
wsrep::client_state::m_local); wsrep::client_state::m_local);
} }
wsrep::client_state* streaming_applier_client_state() wsrep::client_state* streaming_applier_client_state()
{ {
return new wsrep::mock_client_state( return new wsrep::mock_client(
*this, client_service_, ++last_client_id_, *this, ++last_client_id_,
wsrep::client_state::m_high_priority); wsrep::client_state::m_high_priority);
} }
void release_client_state(wsrep::client_state* client_state) void release_client_state(wsrep::client_state* client_state)
@ -66,18 +65,10 @@ namespace wsrep
client_state.before_rollback(); client_state.before_rollback();
client_state.after_rollback(); client_state.after_rollback();
} }
// void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { }
// void on_apply(wsrep::transaction&) { }
// void on_commit(wsrep::transaction&) { }
wsrep::mock_client_service& client_service()
{
return client_service_;
}
private: private:
wsrep::default_mutex mutex_; wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_; wsrep::default_condition_variable cond_;
mutable wsrep::mock_provider provider_; mutable wsrep::mock_provider provider_;
wsrep::mock_client_service client_service_;
unsigned long long last_client_id_; unsigned long long last_client_id_;
}; };
} }

View File

@ -13,7 +13,7 @@ namespace
applying_server_fixture() applying_server_fixture()
: sc("s1", "s1", : sc("s1", "s1",
wsrep::server_state::rm_sync) wsrep::server_state::rm_sync)
, cc(sc, sc.client_service(), , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, ws_handle(1, (void*)1) , ws_handle(1, (void*)1)
@ -25,7 +25,7 @@ namespace
{ {
} }
wsrep::mock_server_state sc; wsrep::mock_server_state sc;
wsrep::mock_client_state cc; wsrep::mock_client cc;
wsrep::ws_handle ws_handle; wsrep::ws_handle ws_handle;
wsrep::ws_meta ws_meta; wsrep::ws_meta ws_meta;
}; };
@ -61,7 +61,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc,
BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback,
applying_server_fixture) applying_server_fixture)
{ {
sc.client_service().fail_next_applying_ = true; cc.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
@ -74,7 +74,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback,
BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback,
applying_server_fixture) applying_server_fixture)
{ {
sc.client_service().fail_next_applying_ = true; cc.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
@ -86,10 +86,9 @@ BOOST_AUTO_TEST_CASE(server_state_streaming)
{ {
wsrep::mock_server_state sc("s1", "s1", wsrep::mock_server_state sc("s1", "s1",
wsrep::server_state::rm_sync); wsrep::server_state::rm_sync);
wsrep::mock_client_state cc(sc, wsrep::mock_client cc(sc,
sc.client_service(), wsrep::client_id(1),
wsrep::client_id(1), wsrep::client_state::m_high_priority);
wsrep::client_state::m_high_priority);
wsrep::ws_handle ws_handle(1, (void*)1); wsrep::ws_handle ws_handle(1, (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("1"), 1, 1), wsrep::stid(wsrep::id("1"), 1, 1),

View File

@ -43,7 +43,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_append_key_data,
BOOST_FIXTURE_TEST_CASE_TEMPLATE(transaction_1pc, T, BOOST_FIXTURE_TEST_CASE_TEMPLATE(transaction_1pc, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
cc.start_transaction(1); cc.start_transaction(1);
@ -85,7 +85,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(transaction_1pc, T,
BOOST_FIXTURE_TEST_CASE_TEMPLATE(transaction_rollback, T, BOOST_FIXTURE_TEST_CASE_TEMPLATE(transaction_rollback, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -117,7 +117,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_1pc_bf_before_before_commit, T, transaction_1pc_bf_before_before_commit, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -159,7 +159,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -198,8 +198,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_1pc_bf_during_commit_wait_for_replayers, T, transaction_1pc_bf_during_commit_wait_for_replayers, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_client& cc(T::cc);
wsrep::mock_client_state& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -208,7 +207,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
sc.client_service().bf_abort_during_wait_ = true; cc.bf_abort_during_wait_ = true;
// Run before commit // Run before commit
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
@ -237,8 +236,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_1pc_error_during_prepare_data, T, transaction_1pc_error_during_prepare_data, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_client& cc(T::cc);
wsrep::mock_client_state& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -247,7 +245,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
sc.client_service().error_during_prepare_data_ = true; cc.error_during_prepare_data_ = true;
// Run before commit // Run before commit
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
@ -277,8 +275,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_1pc_killed_before_certify, T, transaction_1pc_killed_before_certify, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_client& cc(T::cc);
wsrep::mock_client_state& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -287,7 +284,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
sc.client_service().killed_before_certify_ = true; cc.killed_before_certify_ = true;
// Run before commit // Run before commit
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
@ -360,7 +357,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::mock_client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -389,7 +386,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(cc.current_error() == wsrep::e_success); BOOST_REQUIRE(cc.current_error() == wsrep::e_success);
BOOST_REQUIRE(sc.client_service().replays() == 1); BOOST_REQUIRE(cc.replays() == 1);
} }
// //
@ -403,15 +400,15 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::mock_client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
cc.start_transaction(1); cc.start_transaction(1);
BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
sc.client_service().sync_point_enabled_ = "wsrep_before_certification"; cc.sync_point_enabled_ = "wsrep_before_certification";
sc.client_service().sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_unordered; cc.sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_unordered;
sc.provider().certify_result_ = wsrep::provider::error_certification_failed; sc.provider().certify_result_ = wsrep::provider::error_certification_failed;
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_cert_failed); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_cert_failed);
@ -436,7 +433,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -477,7 +474,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -518,7 +515,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -559,7 +556,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -600,7 +597,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -641,7 +638,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::mock_client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -672,7 +669,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(cc.current_error() == wsrep::e_error_during_commit); BOOST_REQUIRE(cc.current_error() == wsrep::e_error_during_commit);
BOOST_REQUIRE(sc.client_service().aborts() == 1); BOOST_REQUIRE(cc.aborts() == 1);
} }
// //
@ -683,7 +680,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_server_state& sc(T::sc);
wsrep::mock_client_state& cc(T::cc); wsrep::mock_client& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -724,8 +721,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
transaction_1pc_bf_abort_before_certify_regain_lock, T, transaction_1pc_bf_abort_before_certify_regain_lock, T,
replicating_fixtures, T) replicating_fixtures, T)
{ {
wsrep::mock_server_state& sc(T::sc); wsrep::mock_client& cc(T::cc);
wsrep::mock_client_state& cc(T::cc);
const wsrep::transaction& tc(T::tc); const wsrep::transaction& tc(T::tc);
// Start a new transaction with ID 1 // Start a new transaction with ID 1
@ -734,8 +730,8 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
sc.client_service().sync_point_enabled_ = "wsrep_after_certification"; cc.sync_point_enabled_ = "wsrep_after_certification";
sc.client_service().sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_ordered; cc.sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_ordered;
// Run before commit // Run before commit
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay);
@ -750,7 +746,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
// Cleanup after statement // Cleanup after statement
cc.after_statement(); cc.after_statement();
BOOST_REQUIRE(sc.client_service().replays() == 1); BOOST_REQUIRE(cc.replays() == 1);
BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
@ -1129,7 +1125,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_streaming_1pc_commit,
streaming_client_fixture_byte) streaming_client_fixture_byte)
{ {
BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0);
sc.client_service().bytes_generated_ = 1; cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
@ -1148,10 +1144,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_byte_batch_streaming_1pc_commit,
cc.enable_streaming( cc.enable_streaming(
wsrep::streaming_context::bytes, 2) == 0); wsrep::streaming_context::bytes, 2) == 0);
BOOST_REQUIRE(cc.start_transaction(1) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0);
sc.client_service().bytes_generated_ = 1; cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0);
sc.client_service().bytes_generated_ = 2; cc.bytes_generated_ = 2;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1);
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);