1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-24 10:42:31 +03:00

Refactored storage service out of client service interface.

This commit is contained in:
Teemu Ollakka
2018-07-07 18:06:37 +03:00
parent a8be09161c
commit 2ac13100f7
31 changed files with 465 additions and 247 deletions

View File

@ -16,8 +16,8 @@ db::client::client(db::server& server,
, params_(params)
, server_(server)
, server_state_(server.server_state())
, client_state_(mutex_, cond_, this, server_state_, client_service_, client_id, mode)
, client_service_(client_state_)
, client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode)
, client_service_(*this)
, se_trx_(server.storage_engine())
, stats_()
{ }
@ -114,7 +114,7 @@ void db::client::run_one_transaction()
{
// wsrep::log_debug() << "Commit";
assert(err == 0);
if (client_state_.do_2pc())
if (do_2pc())
{
err = err || client_state_.before_prepare();
err = err || client_state_.after_prepare();

View File

@ -44,6 +44,7 @@ namespace db
{ }
void start();
wsrep::client_state& client_state() { return client_state_; }
bool do_2pc() const { return false; }
private:
friend class db::server_state;
friend class db::client_service;

View File

@ -6,24 +6,22 @@
#include "db_high_priority_service.hpp"
#include "db_client.hpp"
db::client_service::client_service(db::client& client)
: wsrep::client_service()
, client_(client)
, client_state_(client_.client_state())
{ }
int db::client_service::commit(const wsrep::ws_handle&,
const wsrep::ws_meta&)
bool db::client_service::do_2pc() const
{
db::client* client(client_state_.client());
int ret(client_state_.before_commit());
if (ret == 0) client->se_trx_.commit();
ret = ret || client_state_.ordered_commit();
ret = ret || client_state_.after_commit();
return ret;
return client_.do_2pc();
}
int db::client_service::bf_rollback()
{
db::client* client(client_state_.client());
int ret(client_state_.before_rollback());
assert(ret == 0);
client->se_trx_.rollback();
client_.se_trx_.rollback();
ret = client_state_.after_rollback();
assert(ret == 0);
return ret;
@ -34,13 +32,13 @@ db::client_service::replay()
{
wsrep::high_priority_context high_priority_context(client_state_);
db::high_priority_service high_priority_service(
client_state_.client()->server_, client_state_.client());
client_.server_, client_);
auto ret(client_state_.provider().replay(
client_state_.transaction().ws_handle(),
&high_priority_service));
if (ret == wsrep::provider::success)
{
++client_state_.client()->stats_.replays;
++client_.stats_.replays;
}
return ret;
}

View File

@ -8,37 +8,26 @@
#include "wsrep/client_service.hpp"
#include "wsrep/transaction.hpp"
#include "db_client_state.hpp"
namespace db
{
class client;
class client_state;
class client_service : public wsrep::client_service
{
public:
client_service(db::client_state& client_state)
: wsrep::client_service()
, client_state_(client_state)
{ }
client_service(db::client& client);
bool do_2pc() const override
{
return client_state_.do_2pc();
}
bool do_2pc() const override;
bool interrupted() const override
{
return false;
}
void reset_globals() override
{
client_state_.reset_globals();
}
void reset_globals() override { }
void store_globals() override
{
client_state_.store_globals();
}
void store_globals() override { }
int prepare_data_for_replication() override
{
@ -56,6 +45,7 @@ namespace db
{
return true;
}
int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
{
return 0;
@ -64,12 +54,6 @@ namespace db
void remove_fragments() override
{ }
// int apply_write_set(const wsrep::const_buffer&) override;
// int apply_toi(const wsrep::const_buffer&) override;
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int bf_rollback() override;
void will_replay() override
@ -79,17 +63,12 @@ namespace db
enum wsrep::provider::status replay()
override;
int append_fragment(const wsrep::transaction&, int,
const wsrep::const_buffer&) override
{
return 0;
}
void emergency_shutdown() { ::abort(); }
void debug_sync(const char*) override { }
void debug_crash(const char*) override { }
private:
db::client_state& client_state_;
db::client& client_;
wsrep::client_state& client_state_;
};
}

View File

@ -16,7 +16,6 @@ namespace db
public:
client_state(wsrep::mutex& mutex,
wsrep::condition_variable& cond,
db::client* client,
db::server_state& server_state,
wsrep::client_service& client_service,
const wsrep::client_id& client_id,
@ -27,22 +26,11 @@ namespace db
client_service,
client_id,
mode)
, client_(client)
, is_autocommit_(false)
, do_2pc_(false)
{ }
db::client* client() { return client_; }
void reset_globals() { }
void store_globals() { wsrep::client_state::store_globals(); }
bool is_autocommit() const { return is_autocommit_; }
bool do_2pc() const { return do_2pc_; }
private:
client_state(const client_state&);
client_state& operator=(const client_state&);
db::client* client_;
bool is_autocommit_;
bool do_2pc_;
};
}

View File

@ -7,7 +7,7 @@
#include "db_client.hpp"
db::high_priority_service::high_priority_service(
db::server& server, db::client* client)
db::server& server, db::client& client)
: wsrep::high_priority_service(server_.server_state())
, server_(server)
, client_(client)
@ -17,7 +17,7 @@ int db::high_priority_service::start_transaction(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return client_->client_state().start_transaction(ws_handle, ws_meta);
return client_.client_state().start_transaction(ws_handle, ws_meta);
}
void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
@ -27,8 +27,8 @@ void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
int db::high_priority_service::apply_write_set(const wsrep::const_buffer&)
{
client_->se_trx_.start(client_);
client_->se_trx_.apply(client_->client_state().transaction());
client_.se_trx_.start(&client_);
client_.se_trx_.apply(client_.client_state().transaction());
return 0;
}
@ -41,29 +41,29 @@ int db::high_priority_service::apply_toi(
int db::high_priority_service::commit()
{
int ret(client_->client_state_.before_commit());
if (ret == 0) client_->se_trx_.commit();
ret = ret || client_->client_state_.ordered_commit();
ret = ret || client_->client_state_.after_commit();
int ret(client_.client_state_.before_commit());
if (ret == 0) client_.se_trx_.commit();
ret = ret || client_.client_state_.ordered_commit();
ret = ret || client_.client_state_.after_commit();
return ret;
}
int db::high_priority_service::rollback()
{
int ret(client_->client_state_.before_rollback());
int ret(client_.client_state_.before_rollback());
assert(ret == 0);
client_->se_trx_.rollback();
ret = client_->client_state_.after_rollback();
client_.se_trx_.rollback();
ret = client_.client_state_.after_rollback();
assert(ret == 0);
return ret;
}
void db::high_priority_service::after_apply()
{
client_->client_state_.after_statement();
client_.client_state_.after_statement();
}
bool db::high_priority_service::is_replaying() const
{
return (client_->client_state_.transaction().state() == wsrep::transaction::s_replaying);
return (client_.client_state_.transaction().state() == wsrep::transaction::s_replaying);
}

View File

@ -14,11 +14,14 @@ namespace db
class high_priority_service : public wsrep::high_priority_service
{
public:
high_priority_service(db::server& server, db::client* client);
high_priority_service(db::server& server, db::client& client);
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override;
void adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::const_buffer&) override;
int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&)
override
{ return 0; }
int commit() override;
int rollback() override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override;
@ -33,7 +36,7 @@ namespace db
high_priority_service(const high_priority_service&);
high_priority_service& operator=(const high_priority_service&);
db::server& server_;
db::client* client_;
db::client& client_;
};
}

View File

@ -36,7 +36,7 @@ void db::server::applier_thread()
simulator_.params());
wsrep::client_state* cc(static_cast<wsrep::client_state*>(
&applier.client_state()));
db::high_priority_service hps(*this, &applier);
db::high_priority_service hps(*this, applier);
cc->open(cc->id());
cc->before_command();
enum wsrep::provider::status ret(
@ -110,21 +110,6 @@ void db::server::donate_sst(const std::string& req,
simulator_.sst(*this, req, gtid, bypass);
}
wsrep::client_state* db::server::local_client_state()
{
std::ostringstream id_os;
size_t client_id(++last_client_id_);
db::client* client(new db::client(*this, client_id,
wsrep::client_state::m_local,
simulator_.params()));
return &client->client_state();
}
void db::server::release_client_state(wsrep::client_state* client_state)
{
db::client_state* db_client_state(
dynamic_cast<db::client_state*>(client_state));
delete db_client_state->client();
}
wsrep::high_priority_service* db::server::streaming_applier_service()
{

View File

@ -4,6 +4,7 @@
#include "db_server_service.hpp"
#include "db_server.hpp"
#include "db_storage_service.hpp"
#include "wsrep/logger.hpp"
#include "wsrep/high_priority_service.hpp"
@ -12,15 +13,16 @@ db::server_service::server_service(db::server& server)
: server_(server)
{ }
wsrep::client_state* db::server_service::local_client_state()
wsrep::storage_service* db::server_service::storage_service(
wsrep::client_service&)
{
return server_.local_client_state();
return new db::storage_service();
}
void db::server_service::release_client_state(
wsrep::client_state* client_state)
void db::server_service::release_storage_service(
wsrep::storage_service* storage_service)
{
server_.release_client_state(client_state);
delete storage_service;
}
wsrep::high_priority_service* db::server_service::streaming_applier_service()
@ -41,7 +43,12 @@ bool db::server_service::sst_before_init() const
std::string db::server_service::sst_request()
{
return server_.server_state().id();
std::ostringstream os;
os << server_.server_state().id();
wsrep::log_info() << "SST request: "
<< server_.server_state().id();
return os.str();
}
int db::server_service::start_sst(

View File

@ -15,8 +15,8 @@ namespace db
{
public:
server_service(db::server& server);
wsrep::client_state* local_client_state() override;
void release_client_state(wsrep::client_state*) override;
wsrep::storage_service* storage_service(wsrep::client_service&) override;
void release_storage_service(wsrep::storage_service*) override;
wsrep::high_priority_service* streaming_applier_service() override;
void release_high_priority_service(wsrep::high_priority_service*) override;

View File

@ -23,11 +23,12 @@ void db::simulator::sst(db::server& server,
const wsrep::gtid& gtid,
bool bypass)
{
size_t id;
wsrep::id id;
std::istringstream is(request);
is >> id;
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
auto i(servers_.find(id));
wsrep::log_info() << "SST request";
if (i == servers_.end())
{
throw wsrep::runtime_error("Server " + request + " not found");
@ -86,9 +87,10 @@ void db::simulator::start()
id_os << (i + 1);
std::ostringstream address_os;
address_os << "127.0.0.1:" << server_port(i);
wsrep::id server_id(id_os.str());
auto it(servers_.insert(
std::make_pair(
(i + 1),
server_id,
std::make_unique<db::server>(
*this,
name_os.str(),

View File

@ -46,7 +46,7 @@ namespace db
wsrep::default_mutex mutex_;
const db::params& params_;
std::map<size_t, std::unique_ptr<db::server>> servers_;
std::map<wsrep::id, std::unique_ptr<db::server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
std::chrono::time_point<std::chrono::steady_clock> clients_stop_;
public:

View File

@ -0,0 +1,33 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_STORAGE_SERVICE_HPP
#define WSREP_DB_STORAGE_SERVICE_HPP
#include "wsrep/storage_service.hpp"
#include "wsrep/exception.hpp"
namespace db
{
class storage_service : public wsrep::storage_service
{
int start_transaction() override
{ throw wsrep::not_implemented_error(); }
int append_fragment(const wsrep::id&, wsrep::client_id,
int,
const wsrep::const_buffer&) override
{ throw wsrep::not_implemented_error(); }
int update_fragment_meta(const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
override
{ throw wsrep::not_implemented_error(); }
void store_globals() override { }
void reset_globals() override { }
};
}
#endif // WSREP_DB_STORAGE_SERVICE_HPP

View File

@ -69,9 +69,13 @@ namespace wsrep
virtual bool statement_allowed_for_streaming() const = 0;
virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
/**
* Remove fragments from the storage within current transaction.
* Fragment removal will be committed once the current transaction
* commits.
*/
virtual void remove_fragments() = 0;
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
//
// Rollback
//
@ -128,12 +132,6 @@ namespace wsrep
*/
virtual void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) = 0;
// Streaming replication
/**
* Append a write set fragment into fragment storage.
*/
virtual int append_fragment(const wsrep::transaction&, int flag, const wsrep::const_buffer&) = 0;
//
// Debug interface
//

View File

@ -306,6 +306,23 @@ namespace wsrep
enum wsrep::streaming_context::fragment_unit
fragment_unit,
size_t fragment_size);
/**
* Prepare write set meta data for fragment storage ordering.
* This method should be called from storage service commit
* or rollback before performing the operation.
*/
int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool is_commit)
{
assert(mode_ == m_high_priority);
assert(state_ == s_exec);
return transaction_.prepare_for_fragment_ordering(
ws_handle, ws_meta, is_commit);
}
/** @} */
/** @name Applying interface */
/** @{ */
int start_transaction(const wsrep::ws_handle& wsh,
@ -734,26 +751,6 @@ namespace wsrep
return to_c_string(mode);
}
class client_state_switch
{
public:
client_state_switch(wsrep::client_state& orig_context,
wsrep::client_state& current_context)
: orig_context_(orig_context)
, current_context_(current_context)
{
current_context_.client_service_.store_globals();
}
~client_state_switch()
{
orig_context_.client_service_.store_globals();
}
private:
client_state& orig_context_;
client_state& current_context_;
};
/**
* Utility class to switch the client state to high priority
* mode. The client is switched back to the original mode
@ -804,44 +801,6 @@ namespace wsrep
enum wsrep::client_state::mode orig_mode_;
};
class client_deleter
{
public:
client_deleter(wsrep::server_service& server_service)
: server_service_(server_service)
{ }
void operator()(wsrep::client_state* client_state)
{
server_service_.release_client_state(client_state);
}
private:
wsrep::server_service& server_service_;
};
template <class D>
class scoped_client_state
{
public:
scoped_client_state(wsrep::client_state* client_state, D deleter)
: client_state_(client_state)
, deleter_(deleter)
{
if (client_state_ == 0)
{
throw wsrep::runtime_error("Null client_state provided");
}
}
wsrep::client_state& client_state() { return *client_state_; }
~scoped_client_state()
{
deleter_(client_state_);
}
private:
scoped_client_state(const scoped_client_state&);
scoped_client_state& operator=(const scoped_client_state&);
wsrep::client_state* client_state_;
D deleter_;
};
}
#endif // WSREP_CLIENT_STATE_HPP

View File

@ -40,6 +40,7 @@ namespace wsrep
* Adopt a transaction.
*/
virtual void adopt_transaction(const wsrep::transaction&) = 0;
/**
* Apply a write set.
*
@ -50,6 +51,11 @@ namespace wsrep
*/
virtual int apply_write_set(const wsrep::const_buffer&) = 0;
/**
*
*/
virtual int append_fragment(const wsrep::ws_meta&,
const wsrep::const_buffer& data) = 0;
/**
* Commit a transaction.
*/

View File

@ -83,6 +83,7 @@ namespace wsrep
};
std::ostream& operator<<(std::ostream&, const wsrep::id& id);
std::istream& operator>>(std::istream&, wsrep::id& id);
}
#endif // WSREP_ID_HPP

View File

@ -20,7 +20,8 @@
namespace wsrep
{
class client_state;
class client_service;
class storage_service;
class high_priority_service;
class ws_meta;
class gtid;
@ -28,17 +29,11 @@ namespace wsrep
class server_service
{
public:
/**
* Create client state instance which acts only locally, i.e. does
* not participate in replication. However, local client
* state may execute transactions which require ordering,
* as when modifying local SR fragment storage requires
* strict commit ordering.
*
* @return Pointer to Client State.
*/
virtual wsrep::client_state* local_client_state() = 0;
virtual void release_client_state(wsrep::client_state*) = 0;
virtual wsrep::storage_service* storage_service(
wsrep::client_service&) = 0;
virtual void release_storage_service(wsrep::storage_service*) = 0;
/**
* Create an applier state for streaming transaction applying.
*

View File

@ -87,6 +87,7 @@ namespace wsrep
class transaction;
class const_buffer;
class server_service;
/** @class Server Context
*
*
@ -172,11 +173,11 @@ namespace wsrep
const std::string& name() const { return name_; }
/**
* Return Server identifier string.
* Return Server identifier.
*
* @return Server indetifier string.
* @return Server identifier.
*/
const std::string& id() const { return id_; }
const wsrep::id& id() const { return id_; }
const std::string& incoming_address() const
{ return incoming_address_; }
@ -575,7 +576,7 @@ namespace wsrep
streaming_appliers_map streaming_appliers_;
wsrep::provider* provider_;
std::string name_;
std::string id_;
wsrep::id id_;
std::string incoming_address_;
std::string address_;
std::string working_dir_;

View File

@ -0,0 +1,75 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/** @file storage_service.hpp
*
* Abstract interface which defines required access to DBMS storage
* service. The service is used for storing streaming replication
* write set fragments into stable storage. The interface is used
* from locally processing transaction context only. Corresponding
* operations for high priority processing can be found from
* wsrep::high_priority_service interface.
*/
#ifndef WSREP_STORAGE_SERVICE_HPP
#define WSREP_STORAGE_SERVICE_HPP
#include "client_id.hpp"
#include "id.hpp"
#include "buffer.hpp"
namespace wsrep
{
// Forward declarations
class ws_handle;
class ws_meta;
/**
* Storage service abstract interface.
*/
class storage_service
{
public:
virtual ~storage_service() { }
/**
* Start a new transaction for storage access.
*
* @param ws_hande Write set handle
* @param ws_meta Write set meta data
*
* @return Zero in case of success, non-zero on error.
*/
virtual int start_transaction() = 0;
/**
* Append fragment into stable storage.
*/
virtual int append_fragment(const wsrep::id& server_id,
wsrep::client_id client_id,
int flags,
const wsrep::const_buffer& data) = 0;
/**
* Update fragment meta data after certification process.
*/
virtual int update_fragment_meta(const wsrep::ws_meta&) = 0;
/**
* Commit the transaction.
*/
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
/**
* Roll back the transaction.
*/
virtual int rollback(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual void store_globals() = 0;
virtual void reset_globals() = 0;
};
}
#endif // WSREP_STORAGE_SERVICE_HPP

View File

@ -87,6 +87,10 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta);
int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool is_commit);
int start_replaying(const wsrep::ws_meta&);
int append_key(const wsrep::key&);

View File

@ -52,3 +52,11 @@ std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
return (os << uuid_str);
}
}
std::istream& wsrep::operator>>(std::istream& is, wsrep::id& id)
{
std::string id_str;
std::getline(is, id_str);
id = wsrep::id(id_str);
return is;
}

View File

@ -576,6 +576,10 @@ void wsrep::server_state::on_sync()
case s_joiner:
state(lock, s_initializing);
break;
case s_donor:
state(lock, s_joined);
state(lock, s_synced);
break;
case s_initialized:
state(lock, s_joined);
// fall through

View File

@ -5,6 +5,7 @@
#include "wsrep/transaction.hpp"
#include "wsrep/client_state.hpp"
#include "wsrep/server_state.hpp"
#include "wsrep/storage_service.hpp"
#include "wsrep/high_priority_service.hpp"
#include "wsrep/key.hpp"
#include "wsrep/logger.hpp"
@ -20,6 +21,65 @@
else wsrep::log_debug() << msg; \
} while (0)
namespace
{
class storage_service_deleter
{
public:
storage_service_deleter(wsrep::server_service& server_service)
: server_service_(server_service)
{ }
void operator()(wsrep::storage_service* storage_service)
{
server_service_.release_storage_service(storage_service);
}
private:
wsrep::server_service& server_service_;
};
template <class D>
class scoped_storage_service
{
public:
scoped_storage_service(wsrep::client_service& client_service,
wsrep::storage_service* storage_service,
D deleter)
: client_service_(client_service)
, storage_service_(storage_service)
, deleter_(deleter)
{
if (storage_service_ == 0)
{
throw wsrep::runtime_error("Null client_state provided");
}
client_service_.reset_globals();
storage_service_->store_globals();
}
wsrep::storage_service& storage_service()
{
return *storage_service_;
}
~scoped_storage_service()
{
storage_service_->reset_globals();
client_service_.store_globals();
deleter_(storage_service_);
}
private:
scoped_storage_service(const scoped_storage_service&);
scoped_storage_service& operator=(const scoped_storage_service&);
wsrep::client_service& client_service_;
wsrep::storage_service* storage_service_;
D deleter_;
};
}
// Public
wsrep::transaction::transaction(
@ -94,6 +154,20 @@ int wsrep::transaction::start_transaction(
return 0;
}
int wsrep::transaction::prepare_for_fragment_ordering(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool is_commit)
{
assert(active());
ws_handle_ = ws_handle;
ws_meta_ = ws_meta;
certified_ = is_commit;
return 0;
}
int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta)
{
ws_meta_ = ws_meta;
@ -745,25 +819,32 @@ int wsrep::transaction::certify_fragment(
return 1;
}
// Client context to store fragment in separate transaction
// Switch temporarily to sr_transaction, switch back
// to original when this goes out of scope
wsrep::scoped_client_state<wsrep::client_deleter> sr_client_state_scope(
server_service_.local_client_state(),
wsrep::client_deleter(server_service_));
wsrep::client_state& sr_client_state(
sr_client_state_scope.client_state());
wsrep::client_state_switch client_state_switch(
client_state_,
sr_client_state);
// The rest of this method will be executed in storage service
// scope.
scoped_storage_service<storage_service_deleter>
sr_scope(
client_service_,
server_service_.storage_service(client_service_),
storage_service_deleter(server_service_));
wsrep::storage_service& storage_service(
sr_scope.storage_service());
wsrep::unique_lock<wsrep::mutex> sr_lock(sr_client_state.mutex());
wsrep::transaction& sr_transaction(
sr_client_state.transaction_);
sr_transaction.state(sr_lock, s_certifying);
sr_lock.unlock();
if (sr_client_state.client_service().append_fragment(
sr_transaction, flags_,
// First the fragment is appended to the stable storage.
// This is done to ensure that there is enough capacity
// available to store the fragment. The fragment meta data
// is updated after certification.
if (storage_service.start_transaction())
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_append_fragment_error);
return 1;
}
if (storage_service.append_fragment(
client_state_.server_state().id(),
client_state_.id(),
flags_,
wsrep::const_buffer(data.data(), data.size())))
{
lock.lock();
@ -774,30 +855,22 @@ int wsrep::transaction::certify_fragment(
enum wsrep::provider::status
cert_ret(provider().certify(client_state_.id().get(),
sr_transaction.ws_handle_,
flags_,
sr_transaction.ws_meta_));
ws_handle_,
flags_,
ws_meta_));
int ret(0);
switch (cert_ret)
{
case wsrep::provider::success:
streaming_context_.certified(sr_transaction.ws_meta().seqno());
sr_lock.lock();
sr_transaction.certified_ = true;
sr_transaction.state(sr_lock, s_committing);
sr_lock.unlock();
if (sr_client_state.client_service().commit(
sr_transaction.ws_handle(), sr_transaction.ws_meta()))
streaming_context_.certified(ws_meta_.seqno());
if (storage_service.commit(ws_handle_, ws_meta_))
{
ret = 1;
}
break;
default:
sr_lock.lock();
sr_transaction.state(sr_lock, s_must_abort);
sr_lock.unlock();
sr_client_state.client_service().bf_rollback();
storage_service.rollback(ws_handle_, ws_meta_);
ret = 1;
break;
}

View File

@ -3,8 +3,9 @@
#
add_executable(wsrep-lib_test
mock_high_priority_service.cpp
mock_client_state.cpp
mock_high_priority_service.cpp
mock_storage_service.cpp
test_utils.cpp
id_test.cpp
server_context_test.cpp

View File

@ -6,31 +6,6 @@
#include "mock_client_state.hpp"
#include "mock_high_priority_service.hpp"
int wsrep::mock_client_service::commit(
const wsrep::ws_handle&, const wsrep::ws_meta&)
{
int ret(0);
if (do_2pc())
{
if (client_state_.before_prepare())
{
ret = 1;
}
else if (client_state_.after_prepare())
{
ret = 1;
}
}
if (ret == 0 &&
(client_state_.before_commit() ||
client_state_.ordered_commit() ||
client_state_.after_commit()))
{
ret = 1;
}
return ret;
}
int wsrep::mock_client_service::bf_rollback()
{
int ret(0);

View File

@ -63,21 +63,15 @@ namespace wsrep
int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE;
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
int bf_rollback() WSREP_OVERRIDE;
bool is_autocommit() const WSREP_OVERRIDE
{ return is_autocommit_; }
bool do_2pc() const WSREP_OVERRIDE
{ return do_2pc_; }
bool interrupted() const WSREP_OVERRIDE
{ return killed_before_certify_; }
void reset_globals() WSREP_OVERRIDE { }
void emergency_shutdown() WSREP_OVERRIDE { ++aborts_; }
int append_fragment(const wsrep::transaction&,
@ -135,6 +129,7 @@ namespace wsrep
}
void store_globals() WSREP_OVERRIDE { }
void reset_globals() WSREP_OVERRIDE { }
void debug_sync(const char* sync_point) WSREP_OVERRIDE
{
@ -181,7 +176,6 @@ namespace wsrep
//
size_t replays() const { return replays_; }
size_t aborts() const { return aborts_; }
private:
wsrep::mock_client_state& client_state_;
size_t replays_;

View File

@ -33,6 +33,9 @@ namespace wsrep
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE;
int append_fragment(const wsrep::ws_meta&,
const wsrep::const_buffer& data) WSREP_OVERRIDE
{ return 0; }
int commit() WSREP_OVERRIDE;
int rollback() WSREP_OVERRIDE;
int apply_toi(const wsrep::ws_meta&,

View File

@ -9,6 +9,7 @@
#include "wsrep/server_service.hpp"
#include "mock_client_state.hpp"
#include "mock_high_priority_service.hpp"
#include "mock_storage_service.hpp"
#include "mock_provider.hpp"
#include "wsrep/compiler.hpp"
@ -32,9 +33,22 @@ namespace wsrep
, cond_()
, provider_(*this)
, last_client_id_(0)
, last_transaction_id_(0)
{ }
wsrep::mock_provider& provider() const
{ return provider_; }
wsrep::storage_service* storage_service(wsrep::client_service&)
{
return new wsrep::mock_storage_service(*this, ++last_client_id_);
}
void release_storage_service(wsrep::storage_service* storage_service)
{
delete storage_service;
}
wsrep::client_state* local_client_state()
{
wsrep::client_state* ret(new wsrep::mock_client(
@ -43,6 +57,7 @@ namespace wsrep
ret->open(ret->id());
return ret;
}
void release_client_state(wsrep::client_state* client_state)
{
delete client_state;
@ -103,6 +118,11 @@ namespace wsrep
int wait_committing_transactions(int) WSREP_OVERRIDE { return 0; }
wsrep::transaction_id next_transaction_id()
{
return wsrep::transaction_id(++last_transaction_id_);
}
void debug_sync(const char* sync_point) WSREP_OVERRIDE
{
if (sync_point_enabled_ == sync_point)
@ -128,6 +148,7 @@ namespace wsrep
wsrep::default_condition_variable cond_;
mutable wsrep::mock_provider provider_;
unsigned long long last_client_id_;
unsigned long long last_transaction_id_;
};
}

View File

@ -0,0 +1,56 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "mock_storage_service.hpp"
#include "mock_server_state.hpp"
#include "wsrep/client_state.hpp"
wsrep::mock_storage_service::mock_storage_service(
wsrep::mock_server_state& server_state,
wsrep::client_id client_id)
: server_state_(server_state)
, client_service_(client_state_)
, client_state_(server_state, client_service_, client_id,
wsrep::client_state::m_high_priority)
{
client_state_.open(client_id);
client_state_.before_command();
}
wsrep::mock_storage_service::~mock_storage_service()
{
client_state_.after_command_before_result();
client_state_.after_command_after_result();
client_state_.close();
client_state_.cleanup();
}
int wsrep::mock_storage_service::start_transaction()
{
return client_state_.start_transaction(
server_state_.next_transaction_id());
}
int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return client_state_.prepare_for_fragment_ordering(
ws_handle, ws_meta, true) ||
client_state_.before_commit() ||
client_state_.ordered_commit() ||
client_state_.after_commit() ||
client_state_.after_statement();
}
int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
return client_state_.prepare_for_fragment_ordering(
ws_handle, ws_meta, false) ||
client_state_.before_rollback() ||
client_state_.after_rollback() ||
client_state_.after_statement();
}

View File

@ -0,0 +1,48 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_MOCK_STORAGE_SERVICE_HPP
#define WSREP_MOCK_STORAGE_SERVICE_HPP
#include "wsrep/storage_service.hpp"
#include "mock_client_state.hpp"
namespace wsrep
{
class mock_server_state;
class mock_storage_service : public wsrep::storage_service
{
public:
mock_storage_service(wsrep::mock_server_state&,
wsrep::client_id);
~mock_storage_service();
int start_transaction() WSREP_OVERRIDE;
int append_fragment(const wsrep::id&,
wsrep::client_id,
int,
const wsrep::const_buffer&)
WSREP_OVERRIDE
{ return 0; }
int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
void store_globals() WSREP_OVERRIDE { }
void reset_globals() WSREP_OVERRIDE { }
private:
wsrep::mock_server_state& server_state_;
wsrep::mock_client_service client_service_;
wsrep::mock_client_state client_state_;
};
}
#endif // WSREP_MOCK_STORAGE_SERVICE_HPP