1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-04-19 21:02:17 +03:00

Interface changes required to store and remove fragments from high

priority context.
This commit is contained in:
Teemu Ollakka 2018-07-09 18:12:48 +03:00
parent 958a916b25
commit 6f68c70d37
10 changed files with 85 additions and 26 deletions

View File

@ -25,7 +25,9 @@ void db::high_priority_service::adopt_transaction(const wsrep::transaction&)
throw wsrep::not_implemented_error(); throw wsrep::not_implemented_error();
} }
int db::high_priority_service::apply_write_set(const wsrep::const_buffer&) int db::high_priority_service::apply_write_set(
const wsrep::ws_meta&,
const wsrep::const_buffer&)
{ {
client_.se_trx_.start(&client_); client_.se_trx_.start(&client_);
client_.se_trx_.apply(client_.client_state().transaction()); client_.se_trx_.apply(client_.client_state().transaction());

View File

@ -18,10 +18,15 @@ namespace db
int start_transaction(const wsrep::ws_handle&, int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override; const wsrep::ws_meta&) override;
void adopt_transaction(const wsrep::transaction&) override; void adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::const_buffer&) override; int apply_write_set(const wsrep::ws_meta&,
int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) const wsrep::const_buffer&) override;
int append_fragment_and_commit(
const wsrep::ws_handle&,
const wsrep::ws_meta&, const wsrep::const_buffer&)
override override
{ return 0; } { return 0; }
int remove_fragments(const wsrep::ws_meta&) override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback() override; int rollback() override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override;

View File

@ -312,6 +312,12 @@ namespace wsrep
*/ */
void disable_streaming(); void disable_streaming();
void fragment_applied(wsrep::seqno seqno)
{
assert(mode_ == m_high_priority);
transaction_.fragment_applied(seqno);
}
/** /**
* Prepare write set meta data for ordering. * Prepare write set meta data for ordering.
* This method should be called before ordered commit or * This method should be called before ordered commit or

View File

@ -48,19 +48,43 @@ namespace wsrep
* as a part of the transaction. The caller must start a * as a part of the transaction. The caller must start a
* new transaction before applying a write set and must * new transaction before applying a write set and must
* either commit to make changes persistent or roll back. * either commit to make changes persistent or roll back.
*/
virtual int apply_write_set(const wsrep::const_buffer&) = 0;
/**
* *
*/ */
virtual int append_fragment(const wsrep::ws_meta&, virtual int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer& data) = 0; const wsrep::const_buffer&) = 0;
/**
* Append a fragment into fragment storage. This will be
* called after a non-committing fragment belonging to
* streaming transaction has been applied. The call will
* not be done within an open transaction, the implementation
* must start a new transaction and commit.
*
* Note that the call is not done from streaming transaction
* context, but from applier context.
*/
virtual int append_fragment_and_commit(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) = 0;
/**
* Remove fragments belonging to streaming transaction.
* This method will be called within the streaming transaction
* before commit. The implementation must not commit the
* whole transaction. The call will be done from streaming
* transaction context.
*
* @param ws_meta Write set meta data for commit fragment.
*
* @return Zero on success, non-zero on failure.
*/
virtual int remove_fragments(const wsrep::ws_meta& ws_meta) = 0;
/** /**
* Commit a transaction. * Commit a transaction.
*/ */
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
/** /**
* Roll back a transaction * Roll back a transaction
*/ */

View File

@ -52,6 +52,11 @@ namespace wsrep
bytes_certified_ += bytes; bytes_certified_ += bytes;
} }
void applied(wsrep::seqno seqno)
{
fragments_.push_back(seqno);
}
size_t fragments_certified() const size_t fragments_certified() const
{ {
return fragments_.size(); return fragments_.size();

View File

@ -87,6 +87,8 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle& ws_handle, int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta); const wsrep::ws_meta& ws_meta);
void fragment_applied(wsrep::seqno seqno);
int prepare_for_ordering(const wsrep::ws_handle& ws_handle, int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
bool is_commit); bool is_commit);

View File

@ -42,7 +42,7 @@ namespace
{ {
ret = 1; ret = 1;
} }
else if (high_priority_service.apply_write_set(data)) else if (high_priority_service.apply_write_set(ws_meta, data))
{ {
ret = 1; ret = 1;
} }
@ -68,10 +68,11 @@ namespace
sa->start_transaction(ws_handle, ws_meta); sa->start_transaction(ws_handle, ws_meta);
{ {
wsrep::high_priority_switch sw(high_priority_service, *sa); wsrep::high_priority_switch sw(high_priority_service, *sa);
sa->apply_write_set(data); sa->apply_write_set(ws_meta, data);
sa->after_apply(); sa->after_apply();
} }
high_priority_service.log_dummy_write_set(ws_handle, ws_meta); high_priority_service.append_fragment_and_commit(ws_handle, ws_meta, data);
high_priority_service.after_apply();
} }
else if (ws_meta.flags() == 0) else if (ws_meta.flags() == 0)
{ {
@ -92,17 +93,22 @@ namespace
else else
{ {
wsrep::high_priority_switch sw(high_priority_service, *sa); wsrep::high_priority_switch sw(high_priority_service, *sa);
ret = sa->apply_write_set(data); ret = sa->apply_write_set(ws_meta, data);
sa->after_apply(); sa->after_apply();
} }
high_priority_service.log_dummy_write_set( ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta); ws_handle, ws_meta, data);
if (ret)
{
high_priority_service.rollback();
}
high_priority_service.after_apply();
} }
else if (wsrep::commits_transaction(ws_meta.flags())) else if (wsrep::commits_transaction(ws_meta.flags()))
{ {
if (high_priority_service.is_replaying()) if (high_priority_service.is_replaying())
{ {
ret = high_priority_service.apply_write_set(data) || ret = high_priority_service.apply_write_set(ws_meta, data) ||
high_priority_service.commit(ws_handle, ws_meta); high_priority_service.commit(ws_handle, ws_meta);
} }
else else
@ -129,6 +135,7 @@ namespace
{ {
wsrep::high_priority_switch sw( wsrep::high_priority_switch sw(
high_priority_service, *sa); high_priority_service, *sa);
sa->remove_fragments(ws_meta);
ret = sa->commit(ws_handle, ws_meta); ret = sa->commit(ws_handle, ws_meta);
sa->after_apply(); sa->after_apply();
} }

View File

@ -155,6 +155,12 @@ int wsrep::transaction::start_transaction(
return 0; return 0;
} }
void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
{
assert(active());
streaming_context_.applied(seqno);
}
int wsrep::transaction::prepare_for_ordering( int wsrep::transaction::prepare_for_ordering(
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
@ -271,10 +277,8 @@ int wsrep::transaction::before_prepare(
} }
break; break;
case wsrep::client_state::m_high_priority: case wsrep::client_state::m_high_priority:
if (is_streaming()) // Note: fragment removal is done from applying
{ // context for high priority mode.
client_service_.remove_fragments();
}
break; break;
default: default:
assert(0); assert(0);
@ -909,8 +913,6 @@ int wsrep::transaction::certify_fragment(
ret = 1; ret = 1;
break; break;
} }
wsrep::log_info() << "Committing "
<< sr_ws_meta.transaction_id().get();
if (storage_service.commit(ws_handle_, sr_ws_meta)) if (storage_service.commit(ws_handle_, sr_ws_meta))
{ {
ret = 1; ret = 1;

View File

@ -26,6 +26,7 @@ void wsrep::mock_high_priority_service::adopt_transaction(
} }
int wsrep::mock_high_priority_service::apply_write_set( int wsrep::mock_high_priority_service::apply_write_set(
const wsrep::ws_meta&,
const wsrep::const_buffer&) const wsrep::const_buffer&)
{ {
assert(client_state_->toi_meta().seqno().is_undefined()); assert(client_state_->toi_meta().seqno().is_undefined());

View File

@ -32,9 +32,14 @@ namespace wsrep
WSREP_OVERRIDE; WSREP_OVERRIDE;
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE; int apply_write_set(const wsrep::ws_meta&,
int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE;
const wsrep::const_buffer&) WSREP_OVERRIDE int append_fragment_and_commit(
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&) WSREP_OVERRIDE
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; } { return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE; WSREP_OVERRIDE;