1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Refactored high priority service out of client service.

This commit is contained in:
Teemu Ollakka
2018-07-02 18:22:24 +03:00
parent 658a84a7d4
commit 635eaf4c29
28 changed files with 556 additions and 261 deletions

View File

@ -5,6 +5,7 @@
add_executable(dbsim add_executable(dbsim
db_client.cpp db_client.cpp
db_client_service.cpp db_client_service.cpp
db_high_priority_service.cpp
db_params.cpp db_params.cpp
db_server.cpp db_server.cpp
db_server_service.cpp db_server_service.cpp

View File

@ -11,7 +11,7 @@
#include "db_storage_engine.hpp" #include "db_storage_engine.hpp"
#include "db_client_state.hpp" #include "db_client_state.hpp"
#include "db_client_service.hpp" #include "db_client_service.hpp"
#include "db_high_priority_service.hpp"
namespace db namespace db
{ {
@ -47,6 +47,7 @@ namespace db
private: private:
friend class db::server_state; friend class db::server_state;
friend class db::client_service; friend class db::client_service;
friend class db::high_priority_service;
template <class F> int client_command(F f); template <class F> int client_command(F f);
void run_one_transaction(); void run_one_transaction();
void reset_error(); void reset_error();

View File

@ -3,20 +3,9 @@
// //
#include "db_client_service.hpp" #include "db_client_service.hpp"
#include "db_high_priority_service.hpp"
#include "db_client.hpp" #include "db_client.hpp"
int db::client_service::apply_write_set(const wsrep::const_buffer&)
{
db::client* client(client_state_.client());
client->se_trx_.start(client);
client->se_trx_.apply(client_state_.transaction());
return 0;
}
int db::client_service::apply_toi(const wsrep::const_buffer&)
{
return 0;
}
int db::client_service::commit(const wsrep::ws_handle&, int db::client_service::commit(const wsrep::ws_handle&,
const wsrep::ws_meta&) const wsrep::ws_meta&)
@ -44,9 +33,11 @@ enum wsrep::provider::status
db::client_service::replay() db::client_service::replay()
{ {
wsrep::high_priority_context high_priority_context(client_state_); wsrep::high_priority_context high_priority_context(client_state_);
db::high_priority_service high_priority_service(
client_state_.client()->server_, client_state_.client());
auto ret(client_state_.provider().replay( auto ret(client_state_.provider().replay(
client_state_.transaction().ws_handle(), client_state_.transaction().ws_handle(),
&client_state_)); &high_priority_service));
if (ret == wsrep::provider::success) if (ret == wsrep::provider::success)
{ {
++client_state_.client()->stats_.replays; ++client_state_.client()->stats_.replays;

View File

@ -69,9 +69,9 @@ namespace db
void remove_fragments() override void remove_fragments() override
{ } { }
int apply_write_set(const wsrep::const_buffer&) override; // int apply_write_set(const wsrep::const_buffer&) override;
int apply_toi(const wsrep::const_buffer&) override; // int apply_toi(const wsrep::const_buffer&) override;
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;

View File

@ -0,0 +1,67 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_high_priority_service.hpp"
#include "db_server.hpp"
#include "db_client.hpp"
int db::high_priority_service::apply(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
return server_.server_state().on_apply(*this, ws_handle, ws_meta, data);
}
int db::high_priority_service::start_transaction(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return client_->client_state().start_transaction(ws_handle, ws_meta);
}
void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
{
throw wsrep::not_implemented_error();
}
int db::high_priority_service::apply_write_set(const wsrep::const_buffer&)
{
client_->se_trx_.start(client_);
client_->se_trx_.apply(client_->client_state().transaction());
return 0;
}
int db::high_priority_service::apply_toi(const wsrep::const_buffer&)
{
throw wsrep::not_implemented_error();
}
int db::high_priority_service::commit()
{
int ret(client_->client_state_.before_commit());
if (ret == 0) client_->se_trx_.commit();
ret = ret || client_->client_state_.ordered_commit();
ret = ret || client_->client_state_.after_commit();
return ret;
}
int db::high_priority_service::rollback()
{
int ret(client_->client_state_.before_rollback());
assert(ret == 0);
client_->se_trx_.rollback();
ret = client_->client_state_.after_rollback();
assert(ret == 0);
return ret;
}
void db::high_priority_service::after_apply()
{
client_->client_state_.after_statement();
}
bool db::high_priority_service::is_replaying() const
{
return (client_->client_state_.transaction().state() == wsrep::transaction::s_replaying);
}

View File

@ -0,0 +1,46 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_HIGH_PRIORITY_SERVICE_HPP
#define WSREP_DB_HIGH_PRIORITY_SERVICE_HPP
#include "wsrep/high_priority_service.hpp"
namespace db
{
class server;
class client;
class high_priority_service : wsrep::high_priority_service
{
public:
high_priority_service(db::server& server, db::client* client)
: server_(server)
, client_(client)
{ }
int apply(const wsrep::ws_handle&, const wsrep::ws_meta&,
const wsrep::const_buffer&) override;
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override;
void adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::const_buffer&) override;
int commit() override;
int rollback() override;
int apply_toi(const wsrep::const_buffer&) override;
void after_apply() override;
void store_globals() override { }
void reset_globals() override { }
int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&)
{ return 0; }
bool is_replaying() const;
private:
high_priority_service(const high_priority_service&);
high_priority_service& operator=(const high_priority_service&);
db::server& server_;
db::client* client_;
};
}
#endif // WSREP_DB_HIGH_PRIORITY_SERVICE_HPP

View File

@ -4,6 +4,7 @@
#include "db_server.hpp" #include "db_server.hpp"
#include "db_server_service.hpp" #include "db_server_service.hpp"
#include "db_high_priority_service.hpp"
#include "db_client.hpp" #include "db_client.hpp"
#include "db_simulator.hpp" #include "db_simulator.hpp"
@ -34,11 +35,17 @@ void db::server::applier_thread()
wsrep::client_state::m_high_priority, wsrep::client_state::m_high_priority,
simulator_.params()); simulator_.params());
wsrep::client_state* cc(static_cast<wsrep::client_state*>( wsrep::client_state* cc(static_cast<wsrep::client_state*>(
&applier.client_state())); &applier.client_state()));
db::high_priority_service hps(*this, &applier);
cc->open(cc->id()); cc->open(cc->id());
cc->before_command();
enum wsrep::provider::status ret( enum wsrep::provider::status ret(
server_state_.provider().run_applier(cc)); server_state_.provider().run_applier(&hps));
wsrep::log_info() << "Applier thread exited with error code " << ret; wsrep::log_info() << "Applier thread exited with error code " << ret;
cc->after_command_before_result();
cc->after_command_after_result();
cc->close();
cc->cleanup();
} }
void db::server::start_applier() void db::server::start_applier()
@ -112,15 +119,15 @@ wsrep::client_state* db::server::local_client_state()
simulator_.params())); simulator_.params()));
return &client->client_state(); return &client->client_state();
} }
wsrep::client_state* db::server::streaming_applier_client_state()
{
throw wsrep::not_implemented_error();
}
void db::server::release_client_state(wsrep::client_state* client_state) void db::server::release_client_state(wsrep::client_state* client_state)
{ {
db::client_state* db_client_state( db::client_state* db_client_state(
dynamic_cast<db::client_state*>(client_state)); dynamic_cast<db::client_state*>(client_state));
delete db_client_state->client(); delete db_client_state->client();
} }
wsrep::high_priority_service* db::server::streaming_applier_service()
{
throw wsrep::not_implemented_error();
}

View File

@ -42,9 +42,8 @@ namespace db
} }
void donate_sst(const std::string&, const wsrep::gtid&, bool); void donate_sst(const std::string&, const wsrep::gtid&, bool);
wsrep::client_state* local_client_state(); wsrep::client_state* local_client_state();
wsrep::client_state* streaming_applier_client_state();
void release_client_state(wsrep::client_state*); void release_client_state(wsrep::client_state*);
wsrep::high_priority_service* streaming_applier_service();
private: private:
void start_client(size_t id); void start_client(size_t id);

View File

@ -6,6 +6,7 @@
#include "db_server.hpp" #include "db_server.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/high_priority_service.hpp"
db::server_service::server_service(db::server& server) db::server_service::server_service(db::server& server)
: server_(server) : server_(server)
@ -16,17 +17,23 @@ wsrep::client_state* db::server_service::local_client_state()
return server_.local_client_state(); return server_.local_client_state();
} }
wsrep::client_state* db::server_service::streaming_applier_client_state()
{
return server_.streaming_applier_client_state();
}
void db::server_service::release_client_state( void db::server_service::release_client_state(
wsrep::client_state* client_state) wsrep::client_state* client_state)
{ {
server_.release_client_state(client_state); server_.release_client_state(client_state);
} }
wsrep::high_priority_service* db::server_service::streaming_applier_service()
{
return server_.streaming_applier_service();
}
void db::server_service::release_high_priority_service(
wsrep::high_priority_service *high_priority_service)
{
delete high_priority_service;
}
bool db::server_service::sst_before_init() const bool db::server_service::sst_before_init() const
{ {
return true; return true;

View File

@ -16,8 +16,10 @@ namespace db
public: public:
server_service(db::server& server); server_service(db::server& server);
wsrep::client_state* local_client_state() override; wsrep::client_state* local_client_state() override;
wsrep::client_state* streaming_applier_client_state() override;
void release_client_state(wsrep::client_state*) override; void release_client_state(wsrep::client_state*) override;
wsrep::high_priority_service* streaming_applier_service() override;
void release_high_priority_service(wsrep::high_priority_service*) override;
bool sst_before_init() const override; bool sst_before_init() const override;
int start_sst(const std::string&, const wsrep::gtid&, bool) override; int start_sst(const std::string&, const wsrep::gtid&, bool) override;
std::string sst_request() override; std::string sst_request() override;

View File

@ -6,16 +6,12 @@
* *
* This file will define a `callback` abstract interface for a * This file will define a `callback` abstract interface for a
* DBMS client session service. The interface will define methods * DBMS client session service. The interface will define methods
* which will be called by the wsrep-lib under certain circumstances, * which will be called by the wsrep-lib.
* for example when a transaction rollback is required by internal
* wsrep-lib operation or applier client needs to apply a write set.
*/ */
#ifndef WSREP_CLIENT_SERVICE_HPP #ifndef WSREP_CLIENT_SERVICE_HPP
#define WSREP_CLIENT_SERVICE_HPP #define WSREP_CLIENT_SERVICE_HPP
#include "transaction_termination_service.hpp"
#include "buffer.hpp" #include "buffer.hpp"
#include "provider.hpp" #include "provider.hpp"
#include "mutex.hpp" #include "mutex.hpp"
@ -24,7 +20,7 @@
namespace wsrep namespace wsrep
{ {
class transaction; class transaction;
class client_service : public wsrep::transaction_termination_service class client_service
{ {
public: public:
client_service() { } client_service() { }
@ -80,28 +76,15 @@ namespace wsrep
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
virtual void remove_fragments() = 0; virtual void remove_fragments() = 0;
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
// //
// Applying interface // Rollback
// //
/** /**
* Apply a write set. * Roll back all transactions which are currently active
* * in the client session.
* A write set applying happens always
* as a part of the transaction. The caller must start a
* new transaction before applying a write set and must
* either commit to make changes persistent or roll back.
*/ */
virtual int apply_write_set(const wsrep::const_buffer&) = 0; virtual int rollback() = 0;
/**
* Apply a TOI operation.
*
* TOI operation is a standalone operation and should not
* be executed as a part of a transaction.
*/
virtual int apply_toi(const wsrep::const_buffer&) = 0;
// //
// Interface to global server state // Interface to global server state

View File

@ -391,7 +391,7 @@ namespace wsrep
return transaction_.start_replaying(ws_meta); return transaction_.start_replaying(ws_meta);
} }
void adopt_transaction(wsrep::transaction& transaction) void adopt_transaction(const wsrep::transaction& transaction)
{ {
assert(mode_ == m_high_priority); assert(mode_ == m_high_priority);
transaction_.start_transaction(transaction.id()); transaction_.start_transaction(transaction.id());

View File

@ -0,0 +1,99 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/** @file high_priority_service.hpp
*
* Interface for services for applying high priority transactions.
*/
#ifndef WSREP_HIGH_PRIORITY_SERVICE_HPP
#define WSREP_HIGH_PRIORITY_SERVICE_HPP
namespace wsrep
{
class ws_handle;
class ws_meta;
class const_buffer;
class transaction;
class high_priority_service
{
public:
virtual ~high_priority_service() { }
virtual int apply(const ws_handle&, const ws_meta&,
const const_buffer&) = 0;
/**
* Start a new transaction
*/
virtual int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
/**
* Adopt a transaction.
*/
virtual void adopt_transaction(const wsrep::transaction&) = 0;
/**
* Apply a write set.
*
* A write set applying happens always
* as a part of the transaction. The caller must start a
* new transaction before applying a write set and must
* either commit to make changes persistent or roll back.
*/
virtual int apply_write_set(const wsrep::const_buffer&) = 0;
/**
* Commit a transaction.
*/
virtual int commit() = 0;
/**
* Roll back a transaction
*/
virtual int rollback() = 0;
/**
* Apply a TOI operation.
*
* TOI operation is a standalone operation and should not
* be executed as a part of a transaction.
*/
virtual int apply_toi(const wsrep::const_buffer&) = 0;
/**
* Actions to take after applying a write set was completed.
*/
virtual void after_apply() = 0;
virtual void store_globals() = 0;
virtual void reset_globals() = 0;
virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0;
virtual bool is_replaying() const = 0;
private:
};
class high_priority_switch
{
public:
high_priority_switch(high_priority_service& orig_service,
high_priority_service& current_service)
: orig_service_(orig_service)
, current_service_(current_service)
{
orig_service_.reset_globals();
current_service_.store_globals();
}
~high_priority_switch()
{
current_service_.reset_globals();
orig_service_.store_globals();
}
private:
high_priority_service& orig_service_;
high_priority_service& current_service_;
};
}
#endif // WSREP_HIGH_PRIORITY_SERVICE_HPP

View File

@ -20,6 +20,7 @@
namespace wsrep namespace wsrep
{ {
class client_state; class client_state;
class high_priority_service;
class ws_meta; class ws_meta;
class gtid; class gtid;
class view; class view;
@ -36,19 +37,21 @@ namespace wsrep
* @return Pointer to Client State. * @return Pointer to Client State.
*/ */
virtual wsrep::client_state* local_client_state() = 0; virtual wsrep::client_state* local_client_state() = 0;
virtual void release_client_state(wsrep::client_state*) = 0;
/** /**
* Create an applier state for streaming transaction applying. * Create an applier state for streaming transaction applying.
* *
* @return Pointer to streaming applier client state. * @return Pointer to streaming applier client state.
*/ */
virtual wsrep::client_state* streaming_applier_client_state() = 0; virtual wsrep::high_priority_service*
streaming_applier_service() = 0;
/** /**
* Release a client state allocated by either local_client_state() * Release a client state allocated by either local_client_state()
* or streaming_applier_client_state(). * or streaming_applier_client_state().
*/ */
virtual void release_client_state(wsrep::client_state*) = 0; virtual void release_high_priority_service(
wsrep::high_priority_service*) = 0;
/** /**
* Perform a background rollback for a transaction. * Perform a background rollback for a transaction.

View File

@ -215,15 +215,16 @@ namespace wsrep
void start_streaming_applier( void start_streaming_applier(
const wsrep::id&, const wsrep::id&,
const wsrep::transaction_id&, const wsrep::transaction_id&,
wsrep::client_state* client_state); wsrep::high_priority_service*);
void stop_streaming_applier( void stop_streaming_applier(
const wsrep::id&, const wsrep::transaction_id&); const wsrep::id&, const wsrep::transaction_id&);
/** /**
* Return reference to streaming applier. * Return reference to streaming applier.
*/ */
client_state* find_streaming_applier(const wsrep::id&, wsrep::high_priority_service* find_streaming_applier(
const wsrep::transaction_id&) const; const wsrep::id&,
const wsrep::transaction_id&) const;
/** /**
* Load WSRep provider. * Load WSRep provider.
* *
@ -429,13 +430,13 @@ namespace wsrep
* *
* @todo Make this private, allow calls for provider implementations * @todo Make this private, allow calls for provider implementations
* only. * only.
* @param client_state Applier client context. * @param high_priority_service High priority applier service.
* @param transaction Transaction context. * @param transaction Transaction context.
* @param data Write set data * @param data Write set data
* *
* @return Zero on success, non-zero on failure. * @return Zero on success, non-zero on failure.
*/ */
int on_apply(wsrep::client_state& client_state, int on_apply(wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data); const wsrep::const_buffer& data);
@ -555,7 +556,7 @@ namespace wsrep
size_t pause_count_; size_t pause_count_;
wsrep::seqno pause_seqno_; wsrep::seqno pause_seqno_;
bool desynced_on_pause_; bool desynced_on_pause_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, wsrep::client_state*> streaming_appliers_map; typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, wsrep::high_priority_service*> streaming_appliers_map;
streaming_appliers_map streaming_appliers_; streaming_appliers_map streaming_appliers_;
wsrep::provider* provider_; wsrep::provider* provider_;
std::string name_; std::string name_;

View File

@ -1,21 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_TRANSACTION_TERMINATION_SERVICE_HPP
#define WSREP_TRANSACTION_TERMINATION_SERVICE_HPP
namespace wsrep
{
class ws_handle;
class ws_meta;
class transaction_termination_service
{
public:
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
virtual int rollback() = 0;
};
}
#endif // WSREP_TRANSACTION_TERMINATION_SERVICE_HPP

View File

@ -3,7 +3,7 @@
// //
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "wsrep/client_state.hpp" #include "wsrep/high_priority_service.hpp"
#include "wsrep/transaction.hpp" #include "wsrep/transaction.hpp"
#include "wsrep/view.hpp" #include "wsrep/view.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
@ -56,126 +56,95 @@ namespace
} }
int apply_write_set(wsrep::server_state& server_state, int apply_write_set(wsrep::server_state& server_state,
wsrep::client_state& client_state, wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data)
{ {
int ret(0); int ret(0);
const wsrep::transaction& txc(client_state.transaction());
assert(client_state.mode() == wsrep::client_state::m_high_priority);
bool not_replaying(txc.state() !=
wsrep::transaction::s_replaying);
if (wsrep::starts_transaction(ws_meta.flags()) && if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags())) wsrep::commits_transaction(ws_meta.flags()))
{ {
if (not_replaying) if (high_priority_service.start_transaction(ws_handle, ws_meta))
{
client_state.before_command();
client_state.before_statement();
assert(txc.active() == false);
client_state.start_transaction(ws_handle, ws_meta);
}
else
{
client_state.start_replaying(ws_meta);
}
if (client_state.client_service().apply_write_set(data))
{ {
ret = 1; ret = 1;
} }
else if (client_state.client_service().commit(ws_handle, ws_meta)) else if (high_priority_service.apply_write_set(data))
{
ret = 1;
}
else if (high_priority_service.commit())
{ {
ret = 1; ret = 1;
} }
if (ret) if (ret)
{ {
client_state.client_service().rollback(); high_priority_service.rollback();
} }
high_priority_service.after_apply();
if (not_replaying)
{
client_state.after_statement();
client_state.after_command_before_result();
client_state.after_command_after_result();
}
assert(ret ||
txc.state() == wsrep::transaction::s_committed);
} }
else if (wsrep::starts_transaction(ws_meta.flags())) else if (wsrep::starts_transaction(ws_meta.flags()))
{ {
assert(not_replaying);
assert(server_state.find_streaming_applier( assert(server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0); ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::client_state* sac( wsrep::high_priority_service* sa(
server_state.server_service().streaming_applier_client_state()); server_state.server_service().streaming_applier_service());
server_state.start_streaming_applier( server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sac); ws_meta.server_id(), ws_meta.transaction_id(), sa);
sac->start_transaction(ws_handle, ws_meta); sa->start_transaction(ws_handle, ws_meta);
{ {
// TODO: Client context switch will be ultimately wsrep::high_priority_switch sw(high_priority_service, *sa);
// implemented by the application. Therefore need to sa->apply_write_set(data);
// introduce a virtual method call which takes sa->after_apply();
// both original and sac client contexts as argument
// and a functor which does the applying.
wsrep::client_state_switch sw(client_state, *sac);
sac->before_command();
sac->before_statement();
sac->client_service().apply_write_set(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
} }
server_state.server_service().log_dummy_write_set(client_state, ws_meta); high_priority_service.log_dummy_write_set(ws_handle, ws_meta);
} }
else if (ws_meta.flags() == 0) else if (ws_meta.flags() == 0)
{ {
wsrep::client_state* sac( wsrep::high_priority_service* sa(
server_state.find_streaming_applier( server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id())); ws_meta.server_id(), ws_meta.transaction_id()));
if (sac == 0) if (sa == 0)
{ {
// It is possible that rapid group membership changes // It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before // may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid // commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as // situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too. // it may be an indication of a bug too.
assert(0);
wsrep::log_warning() << "Could not find applier context for " wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id() << ws_meta.server_id()
<< ": " << ws_meta.transaction_id(); << ": " << ws_meta.transaction_id();
} }
else else
{ {
wsrep::client_state_switch(client_state, *sac); wsrep::high_priority_switch(high_priority_service, *sa);
sac->before_command(); ret = sa->apply_write_set(data);
sac->before_statement(); sa->after_apply();
ret = sac->client_service().apply_write_set(data);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
} }
server_state.server_service().log_dummy_write_set( high_priority_service.log_dummy_write_set(
client_state, ws_meta); ws_handle, ws_meta);
} }
else if (wsrep::commits_transaction(ws_meta.flags())) else if (wsrep::commits_transaction(ws_meta.flags()))
{ {
if (not_replaying) if (high_priority_service.is_replaying())
{ {
wsrep::client_state* sac( ret = high_priority_service.apply_write_set(data) ||
high_priority_service.commit();
}
else
{
wsrep::high_priority_service* sa(
server_state.find_streaming_applier( server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id())); ws_meta.server_id(), ws_meta.transaction_id()));
assert(sac); if (sa == 0)
if (sac == 0)
{ {
// It is possible that rapid group membership changes // It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before // may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid // commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as // situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too. // it may be an indication of a bug too.
assert(0);
wsrep::log_warning() wsrep::log_warning()
<< "Could not find applier context for " << "Could not find applier context for "
<< ws_meta.server_id() << ws_meta.server_id()
@ -183,40 +152,25 @@ namespace
} }
else else
{ {
wsrep::client_state_switch(client_state, *sac); wsrep::high_priority_switch(high_priority_service, *sa);
sac->before_command(); ret = sa->commit();
sac->before_statement(); sa->after_apply();
ret = sac->client_service().commit(ws_handle, ws_meta);
sac->after_statement();
sac->after_command_before_result();
sac->after_command_after_result();
server_state.stop_streaming_applier( server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()); ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_client_state(sac); server_state.server_service().release_high_priority_service(sa);
} }
} }
else
{
ret = client_state.start_replaying(ws_meta) ||
client_state.client_service().apply_write_set(
wsrep::const_buffer()) ||
client_state.client_service().commit(ws_handle, ws_meta);
}
} }
else else
{ {
// SR fragment applying not implemented yet // SR fragment applying not implemented yet
assert(0); assert(0);
} }
if (not_replaying)
{
assert(txc.active() == false);
}
return ret; return ret;
} }
int apply_toi(wsrep::provider& provider, int apply_toi(wsrep::provider& provider,
wsrep::client_state& client_state, wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data)
@ -226,9 +180,7 @@ namespace
{ {
// Regular toi // Regular toi
provider.commit_order_enter(ws_handle, ws_meta); provider.commit_order_enter(ws_handle, ws_meta);
client_state.enter_toi(ws_meta); int ret(high_priority_service.apply_toi(data));
int ret(client_state.client_service().apply_toi(data));
client_state.leave_toi();
provider.commit_order_leave(ws_handle, ws_meta); provider.commit_order_leave(ws_handle, ws_meta);
return ret; return ret;
} }
@ -668,7 +620,7 @@ void wsrep::server_state::on_sync()
} }
int wsrep::server_state::on_apply( int wsrep::server_state::on_apply(
wsrep::client_state& client_state, wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data)
@ -682,7 +634,8 @@ int wsrep::server_state::on_apply(
} }
if (is_toi(ws_meta.flags())) if (is_toi(ws_meta.flags()))
{ {
return apply_toi(provider(), client_state, ws_handle, ws_meta, data); return apply_toi(provider(), high_priority_service,
ws_handle, ws_meta, data);
} }
else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags())) else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
{ {
@ -692,7 +645,7 @@ int wsrep::server_state::on_apply(
} }
else else
{ {
return apply_write_set(*this, client_state, return apply_write_set(*this, high_priority_service,
ws_handle, ws_meta, data); ws_handle, ws_meta, data);
} }
} }
@ -700,14 +653,13 @@ int wsrep::server_state::on_apply(
void wsrep::server_state::start_streaming_applier( void wsrep::server_state::start_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id, const wsrep::transaction_id& transaction_id,
wsrep::client_state* client_state) wsrep::high_priority_service* sa)
{ {
if (streaming_appliers_.insert( if (streaming_appliers_.insert(
std::make_pair(std::make_pair(server_id, transaction_id), std::make_pair(std::make_pair(server_id, transaction_id),
client_state)).second == false) sa)).second == false)
{ {
wsrep::log_error() << "Could not insert streaming applier"; wsrep::log_error() << "Could not insert streaming applier";
delete client_state;
throw wsrep::fatal_error(); throw wsrep::fatal_error();
} }
} }
@ -730,7 +682,7 @@ void wsrep::server_state::stop_streaming_applier(
} }
} }
wsrep::client_state* wsrep::server_state::find_streaming_applier( wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id) const const wsrep::transaction_id& transaction_id) const
{ {

View File

@ -5,6 +5,7 @@
#include "wsrep/transaction.hpp" #include "wsrep/transaction.hpp"
#include "wsrep/client_state.hpp" #include "wsrep/client_state.hpp"
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "wsrep/high_priority_service.hpp"
#include "wsrep/key.hpp" #include "wsrep/key.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/compiler.hpp" #include "wsrep/compiler.hpp"
@ -70,15 +71,22 @@ int wsrep::transaction::start_transaction(
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) const wsrep::ws_meta& ws_meta)
{ {
assert(ws_meta.flags()); if (state() != s_replaying)
assert(active() == false); {
id_ = ws_meta.transaction_id(); assert(ws_meta.flags());
assert(client_state_.mode() == wsrep::client_state::m_high_priority); assert(active() == false);
state_ = s_executing; id_ = ws_meta.transaction_id();
state_hist_.clear(); assert(client_state_.mode() == wsrep::client_state::m_high_priority);
ws_handle_ = ws_handle; state_ = s_executing;
ws_meta_ = ws_meta; state_hist_.clear();
certified_ = true; ws_handle_ = ws_handle;
ws_meta_ = ws_meta;
certified_ = true;
}
else
{
start_replaying(ws_meta);
}
return 0; return 0;
} }
@ -341,7 +349,14 @@ int wsrep::transaction::ordered_commit()
assert(state() == s_committing); assert(state() == s_committing);
assert(ordered()); assert(ordered());
int ret(provider().commit_order_leave(ws_handle_, ws_meta_)); int ret(provider().commit_order_leave(ws_handle_, ws_meta_));
// Should always succeed // Should always succeed:
// 1) If before commit before succeeds, the transaction handle
// in the provider is guaranteed to exist and the commit
// has been ordered
// 2) The transaction which has been ordered for commit cannot be BF
// aborted anymore
// 3) The provider should always guarantee that the transactions which
// have been ordered for commit can finish committing.
assert(ret == 0); assert(ret == 0);
state(lock, s_ordered_commit); state(lock, s_ordered_commit);
debug_log_state("ordered_commit_leave"); debug_log_state("ordered_commit_leave");
@ -939,11 +954,11 @@ int wsrep::transaction::certify_commit(
void wsrep::transaction::streaming_rollback() void wsrep::transaction::streaming_rollback()
{ {
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
wsrep::client_state* sac( wsrep::high_priority_service* sa(
server_service_.streaming_applier_client_state()); server_service_.streaming_applier_service());
client_state_.server_state().start_streaming_applier( client_state_.server_state().start_streaming_applier(
client_state_.server_state().id(), id(), sac); client_state_.server_state().id(), id(), sa);
sac->adopt_transaction(*this); sa->adopt_transaction(*this);
streaming_context_.cleanup(); streaming_context_.cleanup();
// Replicate rollback fragment // Replicate rollback fragment
provider().rollback(id_.get()); provider().rollback(id_.get());

View File

@ -5,7 +5,7 @@
#include "wsrep_provider_v26.hpp" #include "wsrep_provider_v26.hpp"
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "wsrep/client_state.hpp" #include "wsrep/high_priority_service.hpp"
#include "wsrep/view.hpp" #include "wsrep/view.hpp"
#include "wsrep/exception.hpp" #include "wsrep/exception.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
@ -355,10 +355,9 @@ namespace
const wsrep_trx_meta_t* meta, const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop __attribute__((unused))) wsrep_bool_t* exit_loop __attribute__((unused)))
{ {
wsrep::client_state* client_state( wsrep::high_priority_service* high_priority_service(
reinterpret_cast<wsrep::client_state*>(ctx)); reinterpret_cast<wsrep::high_priority_service*>(ctx));
assert(client_state); assert(high_priority_service);
assert(client_state->mode() == wsrep::client_state::m_high_priority);
wsrep::const_buffer data(buf->ptr, buf->len); wsrep::const_buffer data(buf->ptr, buf->len);
wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque);
@ -373,8 +372,7 @@ namespace
map_flags_from_native(flags)); map_flags_from_native(flags));
try try
{ {
if (client_state->server_state().on_apply( if (high_priority_service->apply(ws_handle, ws_meta, data))
*client_state, ws_handle, ws_meta, data))
{ {
return WSREP_CB_FAILURE; return WSREP_CB_FAILURE;
} }

View File

@ -3,6 +3,7 @@
# #
add_executable(wsrep-lib_test add_executable(wsrep-lib_test
mock_high_priority_service.cpp
mock_client_state.cpp mock_client_state.cpp
test_utils.cpp test_utils.cpp
id_test.cpp id_test.cpp

View File

@ -4,24 +4,7 @@
#include "wsrep/transaction.hpp" #include "wsrep/transaction.hpp"
#include "mock_client_state.hpp" #include "mock_client_state.hpp"
#include "mock_high_priority_service.hpp"
int wsrep::mock_client_service::apply_write_set(
const wsrep::const_buffer&)
{
assert(client_state_.toi_meta().seqno().is_undefined());
assert(client_state_.transaction().state() == wsrep::transaction::s_executing ||
client_state_.transaction().state() == wsrep::transaction::s_replaying);
return (fail_next_applying_ ? 1 : 0);
}
int wsrep::mock_client_service::apply_toi(const wsrep::const_buffer&)
{
assert(client_state_.transaction().active() == false);
assert(client_state_.toi_meta().seqno().is_undefined() == false);
return (fail_next_toi_ ? 1 : 0);
}
int wsrep::mock_client_service::commit( int wsrep::mock_client_service::commit(
const wsrep::ws_handle&, const wsrep::ws_meta&) const wsrep::ws_handle&, const wsrep::ws_meta&)
@ -61,3 +44,16 @@ int wsrep::mock_client_service::rollback()
} }
return ret; return ret;
} }
enum wsrep::provider::status
wsrep::mock_client_service::replay() WSREP_OVERRIDE
{
wsrep::mock_high_priority_service hps(client_state_.server_state(),
&client_state_, true);
enum wsrep::provider::status ret(
client_state_.provider().replay(
client_state_.transaction().ws_handle(),
&hps));
++replays_;
return ret;
}

View File

@ -46,8 +46,8 @@ namespace wsrep
: wsrep::client_service() : wsrep::client_service()
, is_autocommit_() , is_autocommit_()
, do_2pc_() , do_2pc_()
, fail_next_applying_() // , fail_next_applying_()
, fail_next_toi_() // , fail_next_toi_()
, bf_abort_during_wait_() , bf_abort_during_wait_()
, error_during_prepare_data_() , error_during_prepare_data_()
, killed_before_certify_() , killed_before_certify_()
@ -89,16 +89,7 @@ namespace wsrep
void will_replay() WSREP_OVERRIDE { } void will_replay() WSREP_OVERRIDE { }
enum wsrep::provider::status enum wsrep::provider::status
replay() WSREP_OVERRIDE replay() WSREP_OVERRIDE;
{
enum wsrep::provider::status ret(
client_state_.provider().replay(
client_state_.transaction().ws_handle(),
&client_state_));
++replays_;
return ret;
}
void wait_for_replayers( void wait_for_replayers(
wsrep::unique_lock<wsrep::mutex>& lock) wsrep::unique_lock<wsrep::mutex>& lock)
WSREP_OVERRIDE WSREP_OVERRIDE
@ -172,8 +163,8 @@ namespace wsrep
// //
bool is_autocommit_; bool is_autocommit_;
bool do_2pc_; bool do_2pc_;
bool fail_next_applying_; // bool fail_next_applying_;
bool fail_next_toi_; // bool fail_next_toi_;
bool bf_abort_during_wait_; bool bf_abort_during_wait_;
bool error_during_prepare_data_; bool error_during_prepare_data_;
bool killed_before_certify_; bool killed_before_certify_;

View File

@ -0,0 +1,67 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "mock_high_priority_service.hpp"
#include "mock_server_state.hpp"
int wsrep::mock_high_priority_service::apply(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) WSREP_OVERRIDE
{
return server_state_.on_apply(*this, ws_handle, ws_meta, data);
}
int wsrep::mock_high_priority_service::start_transaction(
const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta)
{
return client_state_->start_transaction(ws_handle, ws_meta);
}
void wsrep::mock_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction)
{
client_state_->adopt_transaction(transaction);
}
int wsrep::mock_high_priority_service::apply_write_set(
const wsrep::const_buffer&)
{
assert(client_state_->toi_meta().seqno().is_undefined());
assert(client_state_->transaction().state() == wsrep::transaction::s_executing ||
client_state_->transaction().state() == wsrep::transaction::s_replaying);
return (fail_next_applying_ ? 1 : 0);
}
int wsrep::mock_high_priority_service::commit()
{
int ret(0);
if (client_state_->client_service().do_2pc())
{
ret = client_state_->before_prepare() ||
client_state_->after_prepare();
}
return (ret || client_state_->before_commit() ||
client_state_->ordered_commit() ||
client_state_->after_commit());
}
int wsrep::mock_high_priority_service::rollback()
{
return (client_state_->before_rollback() ||
client_state_->after_rollback());
}
int wsrep::mock_high_priority_service::apply_toi(const wsrep::const_buffer&)
{
assert(client_state_->transaction().active() == false);
assert(client_state_->toi_meta().seqno().is_undefined() == false);
return (fail_next_toi_ ? 1 : 0);
}
void wsrep::mock_high_priority_service::after_apply()
{
client_state_->after_statement();
}

View File

@ -0,0 +1,62 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP
#define WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP
#include "wsrep/high_priority_service.hpp"
#include "mock_client_state.hpp"
namespace wsrep
{
class mock_high_priority_service : public wsrep::high_priority_service
{
public:
mock_high_priority_service(
wsrep::server_state& server_state,
wsrep::mock_client_state* client_state,
bool replaying)
: wsrep::high_priority_service()
, fail_next_applying_()
, fail_next_toi_()
, server_state_(server_state)
, client_state_(client_state)
, replaying_(replaying)
{ }
int apply(const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&) WSREP_OVERRIDE;
int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE;
int commit() WSREP_OVERRIDE;
int rollback() WSREP_OVERRIDE;
int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE;
void after_apply() WSREP_OVERRIDE;
void store_globals() WSREP_OVERRIDE { }
void reset_globals() WSREP_OVERRIDE { }
int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&)
WSREP_OVERRIDE { return 0; }
bool is_replaying() const WSREP_OVERRIDE { return replaying_; }
wsrep::mock_client_state* client_state()
{
return client_state_;
}
bool fail_next_applying_;
bool fail_next_toi_;
private:
mock_high_priority_service(const mock_high_priority_service&);
mock_high_priority_service& operator=(const mock_high_priority_service&);
wsrep::server_state& server_state_;
wsrep::mock_client_state* client_state_;
bool replaying_;
};
}
#endif // WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP

View File

@ -8,6 +8,7 @@
#include "wsrep/provider.hpp" #include "wsrep/provider.hpp"
#include "wsrep/logger.hpp" #include "wsrep/logger.hpp"
#include "wsrep/buffer.hpp" #include "wsrep/buffer.hpp"
#include "wsrep/high_priority_service.hpp"
#include <cstring> #include <cstring>
#include <map> #include <map>
@ -159,11 +160,13 @@ namespace wsrep
return release_result_; return release_result_;
} }
enum wsrep::provider::status replay(const wsrep::ws_handle&, enum wsrep::provider::status replay(const wsrep::ws_handle& ws_handle,
void* ctx) void* ctx)
{ {
wsrep::client_state& cc( wsrep::mock_high_priority_service& high_priority_service(
*static_cast<wsrep::client_state*>(ctx)); *static_cast<wsrep::mock_high_priority_service*>(ctx));
wsrep::mock_client_state& cc(
*high_priority_service.client_state());
wsrep::high_priority_context high_priority_context(cc); wsrep::high_priority_context high_priority_context(cc);
const wsrep::transaction& tc(cc.transaction()); const wsrep::transaction& tc(cc.transaction());
wsrep::ws_meta ws_meta; wsrep::ws_meta ws_meta;
@ -191,8 +194,8 @@ namespace wsrep
return replay_result_; return replay_result_;
} }
if (server_state_.on_apply(cc, tc.ws_handle(), ws_meta, if (high_priority_service.apply(ws_handle, ws_meta,
wsrep::const_buffer())) wsrep::const_buffer()))
{ {
return wsrep::provider::error_fatal; return wsrep::provider::error_fatal;
} }

View File

@ -7,6 +7,7 @@
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "mock_client_state.hpp" #include "mock_client_state.hpp"
#include "mock_high_priority_service.hpp"
#include "mock_provider.hpp" #include "mock_provider.hpp"
#include "wsrep/compiler.hpp" #include "wsrep/compiler.hpp"
@ -41,20 +42,37 @@ namespace wsrep
ret->open(ret->id()); ret->open(ret->id());
return ret; return ret;
} }
wsrep::client_state* streaming_applier_client_state()
{
wsrep::client_state* ret(
new wsrep::mock_client(
*this, ++last_client_id_,
wsrep::client_state::m_high_priority));
ret->open(ret->id());
return ret;
}
void release_client_state(wsrep::client_state* client_state) void release_client_state(wsrep::client_state* client_state)
{ {
delete client_state; delete client_state;
} }
wsrep::high_priority_service* streaming_applier_service()
{
wsrep::mock_client* cs(new wsrep::mock_client(
*this, ++last_client_id_,
wsrep::client_state::m_high_priority));
wsrep::mock_high_priority_service* ret(
new wsrep::mock_high_priority_service(*this, cs, false));
cs->open(cs->id());
cs->before_command();
return ret;
}
void release_high_priority_service(
wsrep::high_priority_service *high_priority_service)
WSREP_OVERRIDE
{
mock_high_priority_service* mhps(
dynamic_cast<mock_high_priority_service*>(high_priority_service));
wsrep::client_state* cs(mhps->client_state());
cs->after_command_before_result();
cs->after_command_after_result();
cs->close();
cs->cleanup();
delete cs;
delete mhps;
}
void bootstrap() WSREP_OVERRIDE { } void bootstrap() WSREP_OVERRIDE { }
void log_message(enum wsrep::log::level level, const char* message) void log_message(enum wsrep::log::level level, const char* message)
{ {

View File

@ -16,6 +16,7 @@ namespace
, cc(ss, , cc(ss,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, hps(ss, &cc, false)
, ws_handle(1, (void*)1) , ws_handle(1, (void*)1)
, ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), , ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("1"), 1, 1), wsrep::stid(wsrep::id("1"), 1, 1),
@ -24,9 +25,11 @@ namespace
wsrep::provider::flag::commit) wsrep::provider::flag::commit)
{ {
cc.open(cc.id()); cc.open(cc.id());
cc.before_command();
} }
wsrep::mock_server_state ss; wsrep::mock_server_state ss;
wsrep::mock_client cc; wsrep::mock_client cc;
wsrep::mock_high_priority_service hps;
wsrep::ws_handle ws_handle; wsrep::ws_handle ws_handle;
wsrep::ws_meta ws_meta; wsrep::ws_meta ws_meta;
}; };
@ -56,7 +59,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc,
applying_server_fixture) applying_server_fixture)
{ {
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 0); wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction& txc(cc.transaction()); const wsrep::transaction& txc(cc.transaction());
// ::abort(); // ::abort();
@ -70,7 +73,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc,
applying_server_fixture) applying_server_fixture)
{ {
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 0); wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction& txc(cc.transaction()); const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_committed); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_committed);
@ -81,9 +84,9 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc,
BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback,
applying_server_fixture) applying_server_fixture)
{ {
cc.fail_next_applying_ = true; hps.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction& txc(cc.transaction()); const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted);
@ -94,9 +97,9 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback,
BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback,
applying_server_fixture) applying_server_fixture)
{ {
cc.fail_next_applying_ = true; hps.fail_next_applying_ = true;
char buf[1] = { 1 }; char buf[1] = { 1 };
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1); wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction& txc(cc.transaction()); const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted);
@ -109,13 +112,16 @@ BOOST_AUTO_TEST_CASE(server_state_streaming)
wsrep::mock_client cc(ss, wsrep::mock_client cc(ss,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority); wsrep::client_state::m_high_priority);
cc.debug_log_level(1);
wsrep::mock_high_priority_service hps(ss, &cc, false);
wsrep::ws_handle ws_handle(1, (void*)1); wsrep::ws_handle ws_handle(1, (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("1"), 1, 1), wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(0), wsrep::seqno(0),
wsrep::provider::flag::start_transaction); wsrep::provider::flag::start_transaction);
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0); wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(ss.find_streaming_applier( BOOST_REQUIRE(ss.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id())); ws_meta.server_id(), ws_meta.transaction_id()));
@ -123,13 +129,13 @@ BOOST_AUTO_TEST_CASE(server_state_streaming)
wsrep::stid(wsrep::id("1"), 1, 1), wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1), wsrep::seqno(1),
0); 0);
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0); wsrep::const_buffer("1", 1)) == 0);
ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)), ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(wsrep::id("1"), 1, 1), wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1), wsrep::seqno(1),
wsrep::provider::flag::commit); wsrep::provider::flag::commit);
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0); wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(ss.find_streaming_applier( BOOST_REQUIRE(ss.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0); ws_meta.server_id(), ws_meta.transaction_id()) == 0);

View File

@ -250,7 +250,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
// Run before commit // Run before commit
BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(cc.before_commit());
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort);
BOOST_REQUIRE(cc.current_error() == wsrep::e_error_during_commit); BOOST_REQUIRE(cc.current_error() == wsrep::e_size_exceeded_error);
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);