1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Execution context switching for high priority service.

This commit is contained in:
Teemu Ollakka
2018-07-09 11:35:20 +03:00
parent 95dbab4c08
commit 8a1e76bcec
10 changed files with 77 additions and 12 deletions

View File

@ -28,6 +28,8 @@ namespace db
void after_apply() override; void after_apply() override;
void store_globals() override { } void store_globals() override { }
void reset_globals() override { } void reset_globals() override { }
void switch_execution_context(wsrep::high_priority_service&) override
{ }
int log_dummy_write_set(const wsrep::ws_handle&, int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&) const wsrep::ws_meta&)
{ return 0; } { return 0; }

View File

@ -25,7 +25,14 @@ void db::server_service::release_storage_service(
delete storage_service; delete storage_service;
} }
wsrep::high_priority_service* db::server_service::streaming_applier_service() wsrep::high_priority_service* db::server_service::streaming_applier_service(
wsrep::client_service&)
{
return server_.streaming_applier_service();
}
wsrep::high_priority_service* db::server_service::streaming_applier_service(
wsrep::high_priority_service&)
{ {
return server_.streaming_applier_service(); return server_.streaming_applier_service();
} }

View File

@ -17,7 +17,8 @@ namespace db
server_service(db::server& server); server_service(db::server& server);
wsrep::storage_service* storage_service(wsrep::client_service&) override; wsrep::storage_service* storage_service(wsrep::client_service&) override;
void release_storage_service(wsrep::storage_service*) override; void release_storage_service(wsrep::storage_service*) override;
wsrep::high_priority_service* streaming_applier_service() override; wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override;
wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override;
void release_high_priority_service(wsrep::high_priority_service*) override; void release_high_priority_service(wsrep::high_priority_service*) override;
bool sst_before_init() const override; bool sst_before_init() const override;

View File

@ -80,9 +80,24 @@ namespace wsrep
*/ */
virtual void after_apply() = 0; virtual void after_apply() = 0;
/**
* Store global execution context for high priority service.
*/
virtual void store_globals() = 0; virtual void store_globals() = 0;
/**
* Reset global execution context for high priority service.
*/
virtual void reset_globals() = 0; virtual void reset_globals() = 0;
/**
* Switch exection context to context of orig_hps.
*
* @param orig_hps Original high priority service.
*/
virtual void switch_execution_context(
wsrep::high_priority_service& orig_hps) = 0;
virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0; virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0;
virtual bool is_replaying() const = 0; virtual bool is_replaying() const = 0;
@ -102,6 +117,7 @@ namespace wsrep
, current_service_(current_service) , current_service_(current_service)
{ {
orig_service_.reset_globals(); orig_service_.reset_globals();
current_service_.switch_execution_context(orig_service_);
current_service_.store_globals(); current_service_.store_globals();
} }
~high_priority_switch() ~high_priority_switch()

View File

@ -34,13 +34,30 @@ namespace wsrep
wsrep::client_service&) = 0; wsrep::client_service&) = 0;
virtual void release_storage_service(wsrep::storage_service*) = 0; virtual void release_storage_service(wsrep::storage_service*) = 0;
/** /**
* Create an applier state for streaming transaction applying. * Create an applier state for streaming transaction applying.
* *
* @param orig_cs Reference to client service which is
* requesting a new streaming applier service
* instance.
*
* @return Pointer to streaming applier client state. * @return Pointer to streaming applier client state.
*/ */
virtual wsrep::high_priority_service* virtual wsrep::high_priority_service*
streaming_applier_service() = 0; streaming_applier_service(wsrep::client_service& orig_cs) = 0;
/**
* Create an applier state for streaming transaction applying.
*
* @param orig_hps Reference to high priority service which is
* requesting a new streaming applier service
* instance.
*
* @return Pointer to streaming applier client state.
*/
virtual wsrep::high_priority_service*
streaming_applier_service(wsrep::high_priority_service& orig_hps) = 0;
/** /**
* Release a client state allocated by either local_client_state() * Release a client state allocated by either local_client_state()

View File

@ -25,6 +25,7 @@ namespace wsrep
{ } { }
type get() const { return id_; } type get() const { return id_; }
static unsigned long long undefined() { return type(-1); } static unsigned long long undefined() { return type(-1); }
bool is_undefined() const { return (id_ == type(-1)); }
bool operator<(const transaction_id& other) const bool operator<(const transaction_id& other) const
{ {
return (id_ < other.id_); return (id_ < other.id_);

View File

@ -61,7 +61,8 @@ namespace
assert(server_state.find_streaming_applier( assert(server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0); ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::high_priority_service* sa( wsrep::high_priority_service* sa(
server_state.server_service().streaming_applier_service()); server_state.server_service().streaming_applier_service(
high_priority_service));
server_state.start_streaming_applier( server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sa); ws_meta.server_id(), ws_meta.transaction_id(), sa);
sa->start_transaction(ws_handle, ws_meta); sa->start_transaction(ws_handle, ws_meta);
@ -84,14 +85,13 @@ namespace
// commit fragment comes in. Although this is a valid // commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as // situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too. // it may be an indication of a bug too.
assert(0);
wsrep::log_warning() << "Could not find applier context for " wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id() << ws_meta.server_id()
<< ": " << ws_meta.transaction_id(); << ": " << ws_meta.transaction_id();
} }
else else
{ {
wsrep::high_priority_switch(high_priority_service, *sa); wsrep::high_priority_switch sw(high_priority_service, *sa);
ret = sa->apply_write_set(data); ret = sa->apply_write_set(data);
sa->after_apply(); sa->after_apply();
} }
@ -117,7 +117,6 @@ namespace
// commit fragment comes in. Although this is a valid // commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as // situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too. // it may be an indication of a bug too.
assert(0);
wsrep::log_warning() wsrep::log_warning()
<< "Could not find applier context for " << "Could not find applier context for "
<< ws_meta.server_id() << ws_meta.server_id()
@ -125,9 +124,14 @@ namespace
} }
else else
{ {
wsrep::high_priority_switch(high_priority_service, *sa); // Make high priority switch to go out of scope
ret = sa->commit(); // before the streaming applier is released.
sa->after_apply(); {
wsrep::high_priority_switch sw(
high_priority_service, *sa);
ret = sa->commit();
sa->after_apply();
}
server_state.stop_streaming_applier( server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()); ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(sa); server_state.server_service().release_high_priority_service(sa);

View File

@ -1080,7 +1080,8 @@ void wsrep::transaction::streaming_rollback()
{ {
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
wsrep::high_priority_service* sa( wsrep::high_priority_service* sa(
server_service_.streaming_applier_service()); server_service_.streaming_applier_service(
client_state_.client_service()));
client_state_.server_state().start_streaming_applier( client_state_.server_state().start_streaming_applier(
client_state_.server_state().id(), id(), sa); client_state_.server_state().id(), id(), sa);
sa->adopt_transaction(*this); sa->adopt_transaction(*this);

View File

@ -43,6 +43,8 @@ namespace wsrep
void after_apply() WSREP_OVERRIDE; void after_apply() WSREP_OVERRIDE;
void store_globals() WSREP_OVERRIDE { } void store_globals() WSREP_OVERRIDE { }
void reset_globals() WSREP_OVERRIDE { } void reset_globals() WSREP_OVERRIDE { }
void switch_execution_context(wsrep::high_priority_service&)
WSREP_OVERRIDE { }
int log_dummy_write_set(const wsrep::ws_handle&, int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&) const wsrep::ws_meta&)
WSREP_OVERRIDE { return 0; } WSREP_OVERRIDE { return 0; }

View File

@ -63,7 +63,21 @@ namespace wsrep
delete client_state; delete client_state;
} }
wsrep::high_priority_service* streaming_applier_service() wsrep::high_priority_service* streaming_applier_service(
wsrep::client_service&)
{
wsrep::mock_client* cs(new wsrep::mock_client(
*this, ++last_client_id_,
wsrep::client_state::m_high_priority));
wsrep::mock_high_priority_service* ret(
new wsrep::mock_high_priority_service(*this, cs, false));
cs->open(cs->id());
cs->before_command();
return ret;
}
wsrep::high_priority_service* streaming_applier_service(
wsrep::high_priority_service&)
{ {
wsrep::mock_client* cs(new wsrep::mock_client( wsrep::mock_client* cs(new wsrep::mock_client(
*this, ++last_client_id_, *this, ++last_client_id_,