1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-02 05:22:26 +03:00

Refactored server_service out of server_state.

This commit is contained in:
Teemu Ollakka
2018-06-18 17:45:24 +03:00
parent ef5751943d
commit cf231bdf2d
14 changed files with 142 additions and 89 deletions

View File

@ -7,6 +7,7 @@ add_executable(dbsim
db_client_service.cpp
db_params.cpp
db_server.cpp
db_server_service.cpp
db_server_state.cpp
db_simulator.cpp
db_storage_engine.cpp

View File

@ -3,6 +3,7 @@
//
#include "db_server.hpp"
#include "db_server_service.hpp"
#include "db_client.hpp"
#include "db_simulator.hpp"
@ -16,7 +17,9 @@ db::server::server(simulator& simulator,
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, server_state_(*this, name, server_id, address, "dbsim_" + name + "_data")
, server_service_(*this)
, server_state_(*this, server_service_,
name, server_id, address, "dbsim_" + name + "_data")
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()

View File

@ -10,6 +10,7 @@
#include "db_storage_engine.hpp"
#include "db_server_state.hpp"
#include "db_server_service.hpp"
#include <boost/thread.hpp>
@ -51,6 +52,7 @@ namespace db
db::storage_engine storage_engine_;
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
db::server_service server_service_;
db::server_state server_state_;
std::atomic<size_t> last_client_id_;
std::atomic<size_t> last_transaction_id_;

View File

@ -0,0 +1,60 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "db_server_service.hpp"
#include "db_server.hpp"
#include "wsrep/logger.hpp"
db::server_service::server_service(db::server& server)
: server_(server)
{ }
wsrep::client_state* db::server_service::local_client_state()
{
return server_.local_client_state();
}
wsrep::client_state* db::server_service::streaming_applier_client_state()
{
return server_.streaming_applier_client_state();
}
void db::server_service::release_client_state(
wsrep::client_state* client_state)
{
server_.release_client_state(client_state);
}
bool db::server_service::sst_before_init() const
{
return false;
}
std::string db::server_service::sst_request()
{
return server_.server_state().id();
}
int db::server_service::start_sst(
const std::string& request, const wsrep::gtid& gtid, bool bypass)
{
server_.donate_sst(request, gtid, bypass);
return 0;
}
void db::server_service::background_rollback(wsrep::client_state&)
{
}
void db::server_service::log_dummy_write_set(
wsrep::client_state&, const wsrep::ws_meta& meta)
{
wsrep::log_info() << "Dummy write set: " << meta.seqno();
}
void db::server_service::log_view(wsrep::client_state&, const wsrep::view&)
{
wsrep::log_info() << "View";
}

View File

@ -0,0 +1,33 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_DB_SERVER_SERVICE_HPP
#define WSREP_DB_SERVER_SERVICE_HPP
#include "wsrep/server_service.hpp"
#include <string>
namespace db
{
class server;
class server_service : public wsrep::server_service
{
public:
server_service(db::server& server);
wsrep::client_state* local_client_state() override;
wsrep::client_state* streaming_applier_client_state() override;
void release_client_state(wsrep::client_state*) override;
bool sst_before_init() const override;
int start_sst(const std::string&, const wsrep::gtid&, bool) override;
std::string sst_request() override;
void background_rollback(wsrep::client_state&) override;
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
override;
void log_view(wsrep::client_state&, const wsrep::view&) override;
private:
db::server& server_;
};
}
#endif // WSREP_DB_SERVER_SERVICE_HPP

View File

@ -7,50 +7,3 @@
#include "wsrep/logger.hpp"
wsrep::client_state* db::server_state::local_client_state()
{
return server_.local_client_state();
}
wsrep::client_state* db::server_state::streaming_applier_client_state()
{
return server_.streaming_applier_client_state();
}
void db::server_state::release_client_state(
wsrep::client_state* client_state)
{
server_.release_client_state(client_state);
}
bool db::server_state::sst_before_init() const
{
return false;
}
std::string db::server_state::sst_request()
{
return id();
}
int db::server_state::start_sst(
const std::string& request, const wsrep::gtid& gtid, bool bypass)
{
server_.donate_sst(request, gtid, bypass);
return 0;
}
void db::server_state::background_rollback(wsrep::client_state&)
{
}
void db::server_state::log_dummy_write_set(
wsrep::client_state&, const wsrep::ws_meta& meta)
{
wsrep::log_info() << "Dummy write set: " << meta.seqno();
}
void db::server_state::log_view(wsrep::client_state&, const wsrep::view&)
{
wsrep::log_info() << "View";
}

View File

@ -6,6 +6,7 @@
#define WSREP_DB_SERVER_CONTEXT_HPP
#include "wsrep/server_state.hpp"
#include "wsrep/server_service.hpp"
#include "wsrep/client_state.hpp"
#include <atomic>
@ -17,6 +18,7 @@ namespace db
{
public:
server_state(db::server& server,
wsrep::server_service& server_service,
const std::string& name,
const std::string& server_id,
const std::string& address,
@ -24,6 +26,7 @@ namespace db
: wsrep::server_state(
mutex_,
cond_,
server_service,
name,
server_id,
address,
@ -33,16 +36,6 @@ namespace db
, cond_()
, server_(server)
{ }
wsrep::client_state* local_client_state() override;
wsrep::client_state* streaming_applier_client_state() override;
void release_client_state(wsrep::client_state*) override;
bool sst_before_init() const override;
int start_sst(const std::string&, const wsrep::gtid&, bool) override;
std::string sst_request() override;
void background_rollback(wsrep::client_state&) override;
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
override;
void log_view(wsrep::client_state&, const wsrep::view&) override;
private:
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;

View File

@ -2,6 +2,7 @@
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
/** @file server_service.hpp
*
* An abstract interface for a DBMS server services.
@ -9,6 +10,11 @@
* the wsrep-lib.
*/
#ifndef WSREP_SERVER_SERVICE_HPP
#define WSREP_SERVER_SERVICE_HPP
#include <string>
namespace wsrep
{
class client_state;
@ -99,3 +105,5 @@ namespace wsrep
};
}
#endif // WSREP_SERVER_SERVICE_HPP

View File

@ -87,7 +87,7 @@ namespace wsrep
*
*
*/
class server_state : public wsrep::server_service
class server_state
{
public:
/**
@ -157,6 +157,9 @@ namespace wsrep
virtual ~server_state();
wsrep::server_service& server_service() { return server_service_; }
/**
* Return human readable server name.
*
@ -347,6 +350,7 @@ namespace wsrep
*/
server_state(wsrep::mutex& mutex,
wsrep::condition_variable& cond,
wsrep::server_service& server_service,
const std::string& name,
const std::string& id,
const std::string& address,
@ -354,6 +358,7 @@ namespace wsrep
enum rollback_mode rollback_mode)
: mutex_(mutex)
, cond_(cond)
, server_service_(server_service)
, state_(s_disconnected)
, state_waiters_(n_states_)
, streaming_appliers_()
@ -375,6 +380,7 @@ namespace wsrep
wsrep::mutex& mutex_;
wsrep::condition_variable& cond_;
wsrep::server_service& server_service_;
enum state state_;
mutable std::vector<int> state_waiters_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, wsrep::client_state*> streaming_appliers_map;
@ -391,15 +397,15 @@ namespace wsrep
class client_deleter
{
public:
client_deleter(wsrep::server_state& server_state)
: server_state_(server_state)
client_deleter(wsrep::server_service& server_service)
: server_service_(server_service)
{ }
void operator()(wsrep::client_state* client_state)
{
server_state_.release_client_state(client_state);
server_service_.release_client_state(client_state);
}
private:
wsrep::server_state& server_state_;
wsrep::server_service& server_service_;
};
static inline std::string to_string(enum wsrep::server_state::state state)

View File

@ -137,6 +137,7 @@ namespace wsrep
void debug_log_state(const char*) const;
wsrep::provider& provider_;
wsrep::server_service& server_service_;
wsrep::client_state& client_state_;
wsrep::transaction_id id_;
enum state state_;

View File

@ -173,7 +173,7 @@ int wsrep::server_state::on_apply(
assert(not_replaying);
assert(find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::client_state* sac(streaming_applier_client_state());
wsrep::client_state* sac(server_service_.streaming_applier_client_state());
start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sac);
sac->start_transaction(ws_handle, ws_meta);
@ -191,7 +191,7 @@ int wsrep::server_state::on_apply(
sac->after_command_before_result();
sac->after_command_after_result();
}
log_dummy_write_set(client_state, ws_meta);
server_service_.log_dummy_write_set(client_state, ws_meta);
}
else if (ws_meta.flags() == 0)
{
@ -219,7 +219,7 @@ int wsrep::server_state::on_apply(
sac->after_command_before_result();
sac->after_command_after_result();
}
log_dummy_write_set(client_state, ws_meta);
server_service_.log_dummy_write_set(client_state, ws_meta);
}
else if (commits_transaction(ws_meta.flags()))
{
@ -251,7 +251,7 @@ int wsrep::server_state::on_apply(
sac->after_command_after_result();
stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
release_client_state(sac);
server_service_.release_client_state(sac);
}
}
else
@ -273,16 +273,6 @@ int wsrep::server_state::on_apply(
return ret;
}
#if 0
bool wsrep::server_state::statement_allowed_for_streaming(
const wsrep::client_state&,
const wsrep::transaction&) const
{
/* Streaming not implemented yet. */
return false;
}
#endif
void wsrep::server_state::start_streaming_applier(
const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id,

View File

@ -24,6 +24,7 @@
wsrep::transaction::transaction(
wsrep::client_state& client_state)
: provider_(client_state.provider())
, server_service_(client_state.server_state().server_service())
, client_state_(client_state)
, id_(transaction_id::invalid())
, state_(s_executing)
@ -635,7 +636,7 @@ bool wsrep::transaction::bf_abort(
// rollbacker gets control.
state(lock, wsrep::transaction::s_aborting);
lock.unlock();
client_state_.server_state().background_rollback(client_state_);
server_service_.background_rollback(client_state_);
}
}
return ret;
@ -721,8 +722,8 @@ int wsrep::transaction::certify_fragment(
// std::auto_ptr<wsrep::client_state> sr_client_state(
// client_state_.server_state().local_client_state());
wsrep::scoped_client_state<wsrep::client_deleter> sr_client_state_scope(
client_state_.server_state().local_client_state(),
wsrep::client_deleter(client_state_.server_state()));
server_service_.local_client_state(),
wsrep::client_deleter(server_service_));
wsrep::client_state& sr_client_state(
sr_client_state_scope.client_state());
wsrep::client_state_switch client_state_switch(
@ -928,7 +929,7 @@ void wsrep::transaction::streaming_rollback()
{
assert(streaming_context_.rolled_back() == false);
wsrep::client_state* sac(
client_state_.server_state().streaming_applier_client_state());
server_service_.streaming_applier_client_state());
client_state_.server_state().start_streaming_applier(
client_state_.server_state().id(), id(), sac);
sac->adopt_transaction(*this);

View File

@ -289,7 +289,7 @@ namespace
try
{
std::string req(server_state.sst_request());
std::string req(server_state.server_service().sst_request());
*sst_req = ::strdup(req.c_str());
*sst_req_len = strlen(req.c_str());
return WSREP_CB_SUCCESS;
@ -369,7 +369,7 @@ namespace
wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
sizeof(req_gtid->uuid.data)),
wsrep::seqno(req_gtid->seqno));
if (server_state.start_sst(req, gtid, bypass))
if (server_state.server_service().start_sst(req, gtid, bypass))
{
return WSREP_CB_FAILURE;
}

View File

@ -13,13 +13,15 @@
namespace wsrep
{
class mock_server_state : public wsrep::server_state
class mock_server_state
: public wsrep::server_state
, public wsrep::server_service
{
public:
mock_server_state(const std::string& name,
const std::string& id,
enum wsrep::server_state::rollback_mode rollback_mode)
: wsrep::server_state(mutex_, cond_,
: wsrep::server_state(mutex_, cond_, *this,
name, id, "", "./", rollback_mode)
, mutex_()
, cond_()