1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-21 12:22:06 +03:00

Refactored most of the server_state interface into server_service

abstract interface.
This commit is contained in:
Teemu Ollakka
2018-06-18 16:52:41 +03:00
parent 4a92841cb2
commit ef5751943d
13 changed files with 61 additions and 98 deletions

View File

@ -56,6 +56,10 @@ namespace db
return 0; return 0;
} }
bool statement_allowed_for_streaming() const override
{
return true;
}
int prepare_fragment_for_replication(wsrep::mutable_buffer&) override int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
{ {
return 0; return 0;

View File

@ -28,15 +28,16 @@ bool db::server_state::sst_before_init() const
return false; return false;
} }
std::string db::server_state::on_sst_required() std::string db::server_state::sst_request()
{ {
return id(); return id();
} }
void db::server_state::on_sst_request( int db::server_state::start_sst(
const std::string& request, const wsrep::gtid& gtid, bool bypass) const std::string& request, const wsrep::gtid& gtid, bool bypass)
{ {
server_.donate_sst(request, gtid, bypass); server_.donate_sst(request, gtid, bypass);
return 0;
} }
void db::server_state::background_rollback(wsrep::client_state&) void db::server_state::background_rollback(wsrep::client_state&)
@ -48,3 +49,8 @@ void db::server_state::log_dummy_write_set(
{ {
wsrep::log_info() << "Dummy write set: " << meta.seqno(); wsrep::log_info() << "Dummy write set: " << meta.seqno();
} }
void db::server_state::log_view(wsrep::client_state&, const wsrep::view&)
{
wsrep::log_info() << "View";
}

View File

@ -37,11 +37,12 @@ namespace db
wsrep::client_state* streaming_applier_client_state() override; wsrep::client_state* streaming_applier_client_state() override;
void release_client_state(wsrep::client_state*) override; void release_client_state(wsrep::client_state*) override;
bool sst_before_init() const override; bool sst_before_init() const override;
void on_sst_request(const std::string&, const wsrep::gtid&, bool) override; int start_sst(const std::string&, const wsrep::gtid&, bool) override;
std::string on_sst_required() override; std::string sst_request() override;
void background_rollback(wsrep::client_state&) override; void background_rollback(wsrep::client_state&) override;
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
override; override;
void log_view(wsrep::client_state&, const wsrep::view&) override;
private: private:
wsrep::default_mutex mutex_; wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_; wsrep::default_condition_variable cond_;

View File

@ -5,12 +5,17 @@
#ifndef WSREP_APPLYING_SERVICE_HPP #ifndef WSREP_APPLYING_SERVICE_HPP
#define WSREP_APPLYING_SERVICE_HPP #define WSREP_APPLYING_SERVICE_HPP
#include "transaction_termination_service.hpp"
namespace wsrep namespace wsrep
{ {
class applying_service class ws_handle;
class ws_meta;
class applying_service : public wsrep::transaction_termination_service
{ {
public: public:
virtual int start_transaction(); virtual int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual int apply_write_set(const wsrep::const_buffer&) = 0; virtual int apply_write_set(const wsrep::const_buffer&) = 0;
virtual int apply_toi(const wsrep::const_buffer&); virtual int apply_toi(const wsrep::const_buffer&);
virtual int apply_nbo(const wsrep::const_buffer&); virtual int apply_nbo(const wsrep::const_buffer&);

View File

@ -66,6 +66,11 @@ namespace wsrep
// //
// Streaming // Streaming
// //
/**
* Return true if current statement is allowed for streaming,
* otherwise false.
*/
virtual bool statement_allowed_for_streaming() const = 0;
virtual size_t bytes_generated() const = 0; virtual size_t bytes_generated() const = 0;
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
virtual void remove_fragments() = 0; virtual void remove_fragments() = 0;

View File

@ -277,7 +277,10 @@ namespace wsrep
enum wsrep::streaming_context::fragment_unit enum wsrep::streaming_context::fragment_unit
fragment_unit, fragment_unit,
size_t fragment_size); size_t fragment_size);
bool statement_allowed_for_streaming() const
{
return client_service_.statement_allowed_for_streaming();
}
/** @todo deprecate */ /** @todo deprecate */
size_t bytes_generated() const size_t bytes_generated() const
{ {

View File

@ -12,8 +12,12 @@
namespace wsrep namespace wsrep
{ {
class client_state; class client_state;
class ws_meta;
class gtid;
class view;
class server_service class server_service
{ {
public:
/** /**
* Create client state instance which acts only locally, i.e. does * Create client state instance which acts only locally, i.e. does
* not participate in replication. However, local client * not participate in replication. However, local client

View File

@ -63,6 +63,7 @@
#include "mutex.hpp" #include "mutex.hpp"
#include "condition_variable.hpp" #include "condition_variable.hpp"
#include "server_service.hpp"
#include <vector> #include <vector>
#include <string> #include <string>
@ -86,7 +87,7 @@ namespace wsrep
* *
* *
*/ */
class server_state class server_state : public wsrep::server_service
{ {
public: public:
/** /**
@ -190,24 +191,6 @@ namespace wsrep
*/ */
enum rollback_mode rollback_mode() const { return rollback_mode_; } enum rollback_mode rollback_mode() const { return rollback_mode_; }
/**
* Create client context which acts only locally, i.e. does
* not participate in replication. However, local client
* connection may execute transactions which require ordering,
* as when modifying local SR fragment storage requires
* strict commit ordering.
*
* @return Pointer to Client Context.
*/
virtual client_state* local_client_state() = 0;
/**
* Create applier context for streaming transaction.
*/
virtual client_state* streaming_applier_client_state() = 0;
virtual void release_client_state(wsrep::client_state*) = 0;
void start_streaming_applier( void start_streaming_applier(
const wsrep::id&, const wsrep::id&,
const wsrep::transaction_id&, const wsrep::transaction_id&,
@ -220,9 +203,6 @@ namespace wsrep
*/ */
client_state* find_streaming_applier(const wsrep::id&, client_state* find_streaming_applier(const wsrep::id&,
const wsrep::transaction_id&) const; const wsrep::transaction_id&) const;
virtual void log_dummy_write_set(wsrep::client_state&,
const wsrep::ws_meta&) = 0;
/** /**
* Load WSRep provider. * Load WSRep provider.
* *
@ -292,46 +272,6 @@ namespace wsrep
*/ */
void wait_until_state(wsrep::server_state::state) const; void wait_until_state(wsrep::server_state::state) const;
/**
* Virtual method to return true if the configured SST
* method requires SST to be performed before DBMS storage
* engine initialization, false otherwise.
*/
virtual bool sst_before_init() const = 0;
/**
* Virtual method which will be called on *joiner* when the provider
* requests the SST request information. This method should
* provide a string containing an information which the donor
* server can use to donate SST.
*/
virtual std::string on_sst_required() = 0;
/**
* Virtual method which will be called on *donor* when the
* SST request has been delivered by the provider.
* This method should initiate SST transfer or throw
* a wsrep::runtime_error
* if the SST transfer cannot be initiated. If the SST request
* initiation is succesful, the server remains in s_donor
* state until the SST is over or fails. The @param bypass
* should be passed to SST implementation. If the flag is true,
* no actual SST should happen, but the joiner server should
* be notified that the donor has seen the request. The notification
* should included @param gtid provided. This must be passed
* to sst_received() call on the joiner.
*
* @todo Figure out better exception for error codition.
*
* @param sst_request SST request string provided by the joiner.
* @param gtid GTID denoting the current replication position.
* @param bypass Boolean bypass flag.
*/
virtual void on_sst_request(const std::string& sst_request,
const wsrep::gtid& gtid,
bool bypass) = 0;
virtual void background_rollback(wsrep::client_state&) = 0;
/** /**
* *
*/ */
@ -373,20 +313,6 @@ namespace wsrep
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data); const wsrep::const_buffer& data);
/**
* This virtual method should be implemented by the DBMS
* to provide information if the current statement in processing
* is allowd for streaming replication.
*
* @return True if the statement is allowed for streaming
* replication, false otherwise.
*
* @todo Move to client service interface.
*/
virtual bool statement_allowed_for_streaming(
const wsrep::client_state& client_state,
const wsrep::transaction& transaction) const;
/** /**
* Set server wide wsrep debug logging level. * Set server wide wsrep debug logging level.
* *

View File

@ -273,6 +273,7 @@ int wsrep::server_state::on_apply(
return ret; return ret;
} }
#if 0
bool wsrep::server_state::statement_allowed_for_streaming( bool wsrep::server_state::statement_allowed_for_streaming(
const wsrep::client_state&, const wsrep::client_state&,
const wsrep::transaction&) const const wsrep::transaction&) const
@ -280,6 +281,7 @@ bool wsrep::server_state::statement_allowed_for_streaming(
/* Streaming not implemented yet. */ /* Streaming not implemented yet. */
return false; return false;
} }
#endif
void wsrep::server_state::start_streaming_applier( void wsrep::server_state::start_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,

View File

@ -163,8 +163,7 @@ int wsrep::transaction::before_prepare(
client_state_.debug_crash( client_state_.debug_crash(
"crash_last_fragment_commit_before_fragment_removal"); "crash_last_fragment_commit_before_fragment_removal");
lock.unlock(); lock.unlock();
if (client_state_.server_state().statement_allowed_for_streaming( if (client_state_.statement_allowed_for_streaming() == false)
client_state_, *this))
{ {
client_state_.override_error(wsrep::e_error_during_commit); client_state_.override_error(wsrep::e_error_during_commit);
ret = 1; ret = 1;

View File

@ -289,7 +289,7 @@ namespace
try try
{ {
std::string req(server_state.on_sst_required()); std::string req(server_state.sst_request());
*sst_req = ::strdup(req.c_str()); *sst_req = ::strdup(req.c_str());
*sst_req_len = strlen(req.c_str()); *sst_req_len = strlen(req.c_str());
return WSREP_CB_SUCCESS; return WSREP_CB_SUCCESS;
@ -369,7 +369,10 @@ namespace
wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
sizeof(req_gtid->uuid.data)), sizeof(req_gtid->uuid.data)),
wsrep::seqno(req_gtid->seqno)); wsrep::seqno(req_gtid->seqno));
server_state.on_sst_request(req, gtid, bypass); if (server_state.start_sst(req, gtid, bypass))
{
return WSREP_CB_FAILURE;
}
return WSREP_CB_SUCCESS; return WSREP_CB_SUCCESS;
} }
catch (const wsrep::runtime_error& e) catch (const wsrep::runtime_error& e)

View File

@ -124,6 +124,8 @@ namespace wsrep
return bytes_generated_; return bytes_generated_;
} }
bool statement_allowed_for_streaming() const WSREP_OVERRIDE
{ return true; }
int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE WSREP_OVERRIDE
{ {

View File

@ -40,6 +40,7 @@ namespace wsrep
*this, ++last_client_id_, *this, ++last_client_id_,
wsrep::client_state::m_high_priority); wsrep::client_state::m_high_priority);
} }
void release_client_state(wsrep::client_state* client_state) void release_client_state(wsrep::client_state* client_state)
{ {
delete client_state; delete client_state;
@ -50,15 +51,17 @@ namespace wsrep
WSREP_OVERRIDE WSREP_OVERRIDE
{ {
} }
void log_view(wsrep::client_state&, const wsrep::view&) { }
void on_connect() WSREP_OVERRIDE { } void on_connect() WSREP_OVERRIDE { }
void wait_until_connected() WSREP_OVERRIDE { } void wait_until_connected() WSREP_OVERRIDE { }
void on_view(const wsrep::view&) WSREP_OVERRIDE { } void on_view(const wsrep::view&) WSREP_OVERRIDE { }
void on_sync() WSREP_OVERRIDE { } void on_sync() WSREP_OVERRIDE { }
bool sst_before_init() const WSREP_OVERRIDE { return false; } bool sst_before_init() const WSREP_OVERRIDE { return false; }
std::string on_sst_required() WSREP_OVERRIDE { return ""; } std::string sst_request() WSREP_OVERRIDE { return ""; }
void on_sst_request(const std::string&, int start_sst(const std::string&,
const wsrep::gtid&, const wsrep::gtid&,
bool) WSREP_OVERRIDE { } bool) WSREP_OVERRIDE { return 0; }
void background_rollback(wsrep::client_state& client_state) void background_rollback(wsrep::client_state& client_state)
WSREP_OVERRIDE WSREP_OVERRIDE
{ {