mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-24 10:42:31 +03:00
Provider write set handle and meta data for high priority commit
The write set handle and meta data are needed for SR transactions where the commit context is not known when the transaction starts. The passed handle and meta data can be set through client_state prepare_for_ordering() call before performing commit.
This commit is contained in:
@ -39,7 +39,8 @@ int db::high_priority_service::apply_toi(
|
||||
throw wsrep::not_implemented_error();
|
||||
}
|
||||
|
||||
int db::high_priority_service::commit()
|
||||
int db::high_priority_service::commit(const wsrep::ws_handle&,
|
||||
const wsrep::ws_meta&)
|
||||
{
|
||||
int ret(client_.client_state_.before_commit());
|
||||
if (ret == 0) client_.se_trx_.commit();
|
||||
|
@ -22,7 +22,7 @@ namespace db
|
||||
int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&)
|
||||
override
|
||||
{ return 0; }
|
||||
int commit() override;
|
||||
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
|
||||
int rollback() override;
|
||||
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override;
|
||||
void after_apply() override;
|
||||
|
@ -313,13 +313,20 @@ namespace wsrep
|
||||
void disable_streaming();
|
||||
|
||||
/**
|
||||
* Prepare write set meta data for fragment storage ordering.
|
||||
* This method should be called from storage service commit
|
||||
* or rollback before performing the operation.
|
||||
* Prepare write set meta data for ordering.
|
||||
* This method should be called before ordered commit or
|
||||
* rollback if the commit time meta data was not known
|
||||
* at the time of the start of the transaction.
|
||||
* This mostly applies to streaming replication.
|
||||
*
|
||||
* @param ws_handle Write set handle
|
||||
* @param ws_meta Write set meta data
|
||||
* @param is_commit Boolean to denote whether the operation
|
||||
* is commit or rollback.
|
||||
*/
|
||||
int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta,
|
||||
bool is_commit)
|
||||
int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta,
|
||||
bool is_commit)
|
||||
{
|
||||
assert(state_ == s_exec);
|
||||
return transaction_.prepare_for_fragment_ordering(
|
||||
|
@ -59,7 +59,7 @@ namespace wsrep
|
||||
/**
|
||||
* Commit a transaction.
|
||||
*/
|
||||
virtual int commit() = 0;
|
||||
virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0;
|
||||
|
||||
/**
|
||||
* Roll back a transaction
|
||||
@ -98,7 +98,18 @@ namespace wsrep
|
||||
virtual void switch_execution_context(
|
||||
wsrep::high_priority_service& orig_hps) = 0;
|
||||
|
||||
virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0;
|
||||
/**
|
||||
* Log a dummy write set which is either SR transaction fragment
|
||||
* or roll back fragment. The implementation must release
|
||||
* commit order inside the call.
|
||||
*
|
||||
* @params ws_handle Write set handle
|
||||
* @params ws_meta Write set meta data
|
||||
*
|
||||
* @return Zero in case of success, non-zero on failure
|
||||
*/
|
||||
virtual int log_dummy_write_set(const ws_handle& ws_handle,
|
||||
const ws_meta& ws_meta) = 0;
|
||||
|
||||
virtual bool is_replaying() const = 0;
|
||||
|
||||
|
@ -46,7 +46,7 @@ namespace
|
||||
{
|
||||
ret = 1;
|
||||
}
|
||||
else if (high_priority_service.commit())
|
||||
else if (high_priority_service.commit(ws_handle, ws_meta))
|
||||
{
|
||||
ret = 1;
|
||||
}
|
||||
@ -103,7 +103,7 @@ namespace
|
||||
if (high_priority_service.is_replaying())
|
||||
{
|
||||
ret = high_priority_service.apply_write_set(data) ||
|
||||
high_priority_service.commit();
|
||||
high_priority_service.commit(ws_handle, ws_meta);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -129,7 +129,7 @@ namespace
|
||||
{
|
||||
wsrep::high_priority_switch sw(
|
||||
high_priority_service, *sa);
|
||||
ret = sa->commit();
|
||||
ret = sa->commit(ws_handle, ws_meta);
|
||||
sa->after_apply();
|
||||
}
|
||||
server_state.stop_streaming_applier(
|
||||
|
@ -134,9 +134,10 @@ int wsrep::transaction::start_transaction(
|
||||
const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
debug_log_state("start_transaction enter");
|
||||
if (state() != s_replaying)
|
||||
{
|
||||
assert(ws_meta.flags());
|
||||
// assert(ws_meta.flags());
|
||||
assert(active() == false);
|
||||
id_ = ws_meta.transaction_id();
|
||||
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
|
||||
@ -150,6 +151,7 @@ int wsrep::transaction::start_transaction(
|
||||
{
|
||||
start_replaying(ws_meta);
|
||||
}
|
||||
debug_log_state("start_transaction leave");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -553,6 +555,7 @@ int wsrep::transaction::after_statement()
|
||||
int ret(0);
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||
debug_log_state("after_statement_enter");
|
||||
assert(client_state_.mode() == wsrep::client_state::m_local);
|
||||
assert(state() == s_executing ||
|
||||
state() == s_committed ||
|
||||
state() == s_aborted ||
|
||||
@ -669,6 +672,23 @@ void wsrep::transaction::after_applying()
|
||||
assert(state_ == s_executing ||
|
||||
state_ == s_committed ||
|
||||
state_ == s_aborted);
|
||||
|
||||
// We may enter here from either high priority applier or
|
||||
// from fragment storage service. High priority applier
|
||||
// should always have set up meta data for ordering, but
|
||||
// fragment storage service operation may be rolled back
|
||||
// before the fragment is ordered and certified.
|
||||
// Therefore we need to check separately if the ordering has
|
||||
// been done.
|
||||
if (state_ == s_aborted && ordered())
|
||||
{
|
||||
int ret(provider().commit_order_enter(ws_handle_, ws_meta_));
|
||||
if (ret == 0)
|
||||
{
|
||||
provider().commit_order_leave(ws_handle_, ws_meta_);
|
||||
}
|
||||
}
|
||||
|
||||
if (state_ != s_executing)
|
||||
{
|
||||
cleanup();
|
||||
|
@ -34,7 +34,8 @@ int wsrep::mock_high_priority_service::apply_write_set(
|
||||
return (fail_next_applying_ ? 1 : 0);
|
||||
}
|
||||
|
||||
int wsrep::mock_high_priority_service::commit()
|
||||
int wsrep::mock_high_priority_service::commit(const wsrep::ws_handle&,
|
||||
const wsrep::ws_meta&)
|
||||
{
|
||||
|
||||
int ret(0);
|
||||
|
@ -36,7 +36,8 @@ namespace wsrep
|
||||
int append_fragment(const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&) WSREP_OVERRIDE
|
||||
{ return 0; }
|
||||
int commit() WSREP_OVERRIDE;
|
||||
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
|
||||
WSREP_OVERRIDE;
|
||||
int rollback() WSREP_OVERRIDE;
|
||||
int apply_toi(const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&) WSREP_OVERRIDE;
|
||||
|
@ -36,7 +36,7 @@ int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_ha
|
||||
int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
int ret(client_state_.prepare_for_fragment_ordering(
|
||||
int ret(client_state_.prepare_for_ordering(
|
||||
ws_handle, ws_meta, true) ||
|
||||
client_state_.before_commit() ||
|
||||
client_state_.ordered_commit() ||
|
||||
@ -48,7 +48,7 @@ int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle,
|
||||
int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
int ret(client_state_.prepare_for_fragment_ordering(
|
||||
int ret(client_state_.prepare_for_ordering(
|
||||
ws_handle, ws_meta, false) ||
|
||||
client_state_.before_rollback() ||
|
||||
client_state_.after_rollback());
|
||||
|
Reference in New Issue
Block a user