1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-08-05 04:01:12 +03:00

Extract before_prepare_local()/before_prepare_high_priority()

This commit is contained in:
Teemu Ollakka
2023-04-04 14:55:51 +03:00
parent 55a0bd6469
commit a36bae31d8
2 changed files with 114 additions and 88 deletions

View File

@@ -238,6 +238,8 @@ namespace wsrep
wsrep::provider& provider();
void flags(int flags) { flags_ = flags; }
int before_prepare_local(wsrep::unique_lock<wsrep::mutex>&);
int before_prepare_high_priority(wsrep::unique_lock<wsrep::mutex>&);
int before_commit_local(wsrep::unique_lock<wsrep::mutex>&);
int before_commit_high_priority(wsrep::unique_lock<wsrep::mutex>&);
// Return true if the transaction must abort, is aborting,

View File

@@ -225,6 +225,8 @@ int wsrep::transaction::prepare_for_ordering(
int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid)
{
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_local);
try
{
return provider().assign_read_view(ws_handle_, gtid);
@@ -239,6 +241,7 @@ int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid)
int wsrep::transaction::append_key(const wsrep::key& key)
{
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_local);
try
{
debug_log_key_append(key);
@@ -255,11 +258,13 @@ int wsrep::transaction::append_key(const wsrep::key& key)
int wsrep::transaction::append_data(const wsrep::const_buffer& data)
{
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_local);
return provider().append_data(ws_handle_, data);
}
int wsrep::transaction::after_row()
{
assert(client_state_.mode() == wsrep::client_state::m_local);
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
debug_log_state("after_row_enter");
int ret(0);
@@ -272,106 +277,121 @@ int wsrep::transaction::after_row()
return ret;
}
int wsrep::transaction::before_prepare_local(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(client_state_.mode() == wsrep::client_state::m_local);
assert(state() == s_executing || state() == s_must_abort ||
state() == s_replaying);
if (state() == s_must_abort)
{
client_state_.override_error(wsrep::e_deadlock_error);
return 1;
}
int ret = 0;
if (is_streaming())
{
client_service_.debug_crash(
"crash_last_fragment_commit_before_fragment_removal");
lock.unlock();
if (client_service_.statement_allowed_for_streaming() == false)
{
client_state_.override_error(wsrep::e_error_during_commit,
wsrep::provider::error_not_allowed);
ret = 1;
}
else if (!is_xa())
{
// Note: we can't remove fragments here for XA,
// the transaction has already issued XA END and
// is in IDLE state, no more changes allowed!
ret = client_service_.remove_fragments();
if (ret)
{
client_state_.override_error(wsrep::e_deadlock_error);
}
}
lock.lock();
client_service_.debug_crash(
"crash_last_fragment_commit_after_fragment_removal");
if (state() == s_must_abort)
{
client_state_.override_error(wsrep::e_deadlock_error);
ret = 1;
}
}
if (ret == 0)
{
if (is_xa())
{
// Force fragment replication on XA prepare
flags(flags() | wsrep::provider::flag::prepare);
pa_unsafe(true);
append_sr_keys_for_commit();
const bool force_streaming_step = true;
ret = streaming_step(lock, force_streaming_step);
if (ret == 0)
{
assert(state() == s_executing);
state(lock, s_preparing);
}
}
else
{
ret = certify_commit(lock);
}
assert((ret == 0 && state() == s_preparing)
|| (state() == s_must_abort || state() == s_must_replay
|| state() == s_cert_failed));
if (ret)
{
assert(state() == s_must_replay || client_state_.current_error());
ret = 1;
}
}
return ret;
}
int wsrep::transaction::before_prepare_high_priority(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
assert(state() == s_executing || state() == s_replaying);
// Note: fragment removal is done from applying
// context for high priority mode.
if (is_xa())
{
assert(state() == s_executing || state() == s_replaying);
if (state() == s_replaying)
{
return 0;
}
}
state(lock, s_preparing);
return 0;
}
int wsrep::transaction::before_prepare(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
int ret(0);
debug_log_state("before_prepare_enter");
assert(state() == s_executing || state() == s_must_abort ||
state() == s_replaying);
if (state() == s_must_abort)
{
assert(client_state_.mode() == wsrep::client_state::m_local);
client_state_.override_error(wsrep::e_deadlock_error);
return 1;
}
switch (client_state_.mode())
{
case wsrep::client_state::m_local:
if (is_streaming())
{
client_service_.debug_crash(
"crash_last_fragment_commit_before_fragment_removal");
lock.unlock();
if (client_service_.statement_allowed_for_streaming() == false)
{
client_state_.override_error(
wsrep::e_error_during_commit,
wsrep::provider::error_not_allowed);
ret = 1;
}
else if (!is_xa())
{
// Note: we can't remove fragments here for XA,
// the transaction has already issued XA END and
// is in IDLE state, no more changes allowed!
ret = client_service_.remove_fragments();
if (ret)
{
client_state_.override_error(wsrep::e_deadlock_error);
}
}
lock.lock();
client_service_.debug_crash(
"crash_last_fragment_commit_after_fragment_removal");
if (state() == s_must_abort)
{
client_state_.override_error(wsrep::e_deadlock_error);
ret = 1;
}
}
if (ret == 0)
{
if (is_xa())
{
// Force fragment replication on XA prepare
flags(flags() | wsrep::provider::flag::prepare);
pa_unsafe(true);
append_sr_keys_for_commit();
const bool force_streaming_step = true;
ret = streaming_step(lock, force_streaming_step);
if (ret == 0)
{
assert(state() == s_executing);
state(lock, s_preparing);
}
}
else
{
ret = certify_commit(lock);
}
assert((ret == 0 && state() == s_preparing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
if (ret)
{
assert(state() == s_must_replay ||
client_state_.current_error());
ret = 1;
}
}
ret = before_prepare_local(lock);
break;
case wsrep::client_state::m_high_priority:
// Note: fragment removal is done from applying
// context for high priority mode.
if (is_xa())
{
assert(state() == s_executing ||
state() == s_replaying);
if (state() == s_replaying)
{
break;
}
}
state(lock, s_preparing);
ret = before_prepare_high_priority(lock);
break;
default:
assert(0);
@@ -441,6 +461,8 @@ int wsrep::transaction::after_prepare(
int wsrep::transaction::before_commit_local(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
int ret = 1;
assert(state() == s_executing || state() == s_prepared
|| state() == s_committing || state() == s_must_abort
@@ -512,6 +534,8 @@ int wsrep::transaction::before_commit_local(
int wsrep::transaction::before_commit_high_priority(
wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
int ret = 1;
assert(certified());
assert(ordered());