1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

The initial version of the SR speedup feature.

This version contains many debug messages and does not work very well.
This commit is contained in:
Pekka Lampio
2021-02-05 18:52:50 +02:00
parent a93955ddee
commit a866ce7107
5 changed files with 166 additions and 8 deletions

View File

@ -50,6 +50,21 @@ option(WSREP_LIB_WITH_COVERAGE "Compile with coverage instrumentation" OFF)
option(WSREP_LIB_STRICT_BUILD_FLAGS "Compile with strict build flags" OFF)
option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF)
#
# Option for WSREP's Streaming Replication speedup
#
option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ON)
#option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" OFF)
## add_definitions(-DWITH_WSREP_SR_SPEEDUP_CHECK)
#option(WITH_WSREP_SR_SPEEDUP_TIME "Timing for Streaming Replication speedup" ON)
if (WITH_WSREP_SR_SPEEDUP)
ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP)
ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP)
# ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP_REPLAY)
# ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP_REPLAY)
endif()
# Compiler options
# Set std to C++0x/C++11 if superproject has not set standard yet

View File

@ -100,6 +100,8 @@ namespace wsrep
std::ostream& operator<<(std::ostream&, const wsrep::id& id);
std::istream& operator>>(std::istream&, wsrep::id& id);
}
#endif // WSREP_ID_HPP

View File

@ -65,8 +65,12 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
size_t offset,
const wsrep::xid& xid, void * current_thd) = 0;
#else
const wsrep::xid& xid) = 0;
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
/**
* Update fragment meta data after certification process.
*/

View File

@ -87,11 +87,14 @@ static int apply_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
const wsrep::const_buffer& data, int line)
{
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "server_state: apply_fragment, line = " << line;
#endif /* DEBUG_SR_SPEEDUP */
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
@ -357,7 +360,7 @@ static int apply_write_set(wsrep::server_state& server_state,
sa,
ws_handle,
ws_meta,
data);
data, __LINE__);
}
else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags()))
{
@ -386,7 +389,7 @@ static int apply_write_set(wsrep::server_state& server_state,
sa,
ws_handle,
ws_meta,
data);
data, __LINE__);
}
}
else if (wsrep::commits_transaction(ws_meta.flags()))
@ -1145,6 +1148,10 @@ void wsrep::server_state::start_streaming_client(
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_server_state,
"Start streaming client: " << client_state->id());
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "server_state: start_streaming_client";
#endif /* DEBUG_SR_SPEEDUP */
if (streaming_clients_.insert(
std::make_pair(client_state->id(), client_state)).second == false)
{
@ -1246,6 +1253,9 @@ void wsrep::server_state::start_streaming_applier(
wsrep::high_priority_service* sa)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
#ifdef DEBUG_SR_SPEEDUP2
wsrep::log_warning() << "start_streaming_applier";
#endif /* DEBUG_SR_SPEEDUP2 */
if (streaming_appliers_.insert(
std::make_pair(std::make_pair(server_id, transaction_id),
sa)).second == false)

View File

@ -29,6 +29,10 @@
#include <sstream>
#include <memory>
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
extern void *wsrep_get_current_thd();
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
namespace
{
class storage_service_deleter
@ -129,6 +133,10 @@ int wsrep::transaction::start_transaction(
state_hist_.clear();
ws_handle_ = wsrep::ws_handle(id);
flags(wsrep::provider::flag::start_transaction);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
<< ": id = " << id.get();
#endif /* DEBUG_SR_SPEEDUP */
switch (client_state_.mode())
{
case wsrep::client_state::m_high_priority:
@ -175,6 +183,10 @@ int wsrep::transaction::start_transaction(
certified_ = true;
}
debug_log_state("start_transaction leave");
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
<< ": id = " << id().get();
#endif /* DEBUG_SR_SPEEDUP */
return 0;
}
@ -250,6 +262,9 @@ int wsrep::transaction::append_key(const wsrep::key& key)
int wsrep::transaction::append_data(const wsrep::const_buffer& data)
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__ << ": size = " << data.size();
#endif /* DEBUG_SR_SPEEDUP */
return provider().append_data(ws_handle_, data);
}
@ -262,6 +277,9 @@ int wsrep::transaction::after_row()
if (streaming_context_.fragment_size() &&
streaming_context_.fragment_unit() != streaming_context::statement)
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__;
#endif /* DEBUG_SR_SPEEDUP */
ret = streaming_step(lock);
}
debug_log_state("after_row_leave");
@ -304,6 +322,10 @@ int wsrep::transaction::before_prepare(
// Note: we can't remove fragments here for XA,
// the transaction has already issued XA END and
// is in IDLE state, no more changes allowed!
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "before remove_fragments, line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
ret = client_service_.remove_fragments();
if (ret)
{
@ -329,6 +351,9 @@ int wsrep::transaction::before_prepare(
flags(flags() | wsrep::provider::flag::pa_unsafe);
append_sr_keys_for_commit();
const bool force_streaming_step = true;
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
ret = streaming_step(lock, force_streaming_step);
if (ret == 0)
{
@ -339,6 +364,10 @@ int wsrep::transaction::before_prepare(
else
{
ret = certify_commit(lock);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "after certify_commit(): line = "
<< __LINE__ << ", ret = " << ret;
#endif /* DEBUG_SR_SPEEDUP */
}
assert((ret == 0 && state() == s_preparing) ||
@ -485,6 +514,10 @@ int wsrep::transaction::before_commit()
if (ret == 0 && state() == s_prepared)
{
ret = certify_commit(lock);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "after certify_commit(): line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
assert((ret == 0 && state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
@ -589,6 +622,10 @@ int wsrep::transaction::ordered_commit()
{
state(lock, s_ordered_commit);
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__
<< ": id = " << id().get() << ", --> " << ret;
#endif /* DEBUG_SR_SPEEDUP */
debug_log_state("ordered_commit_leave");
return ret;
}
@ -619,6 +656,10 @@ int wsrep::transaction::after_commit()
wsrep::storage_service& storage_service(
sr_scope.storage_service());
storage_service.adopt_transaction(*this);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "before remove_fragments, line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
storage_service.remove_fragments();
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
lock.lock();
@ -654,6 +695,10 @@ int wsrep::transaction::after_commit()
int wsrep::transaction::before_rollback()
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN: wsrep::transaction::before_rollback: id = "
<< id().get();
#endif /* DEBUG_SR_SPEEDUP */
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
debug_log_state("before_rollback_enter");
assert(state() == s_executing ||
@ -736,11 +781,18 @@ int wsrep::transaction::before_rollback()
}
debug_log_state("before_rollback_leave");
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END: wsrep::transaction::before_rollback";
#endif /* DEBUG_SR_SPEEDUP */
return 0;
}
int wsrep::transaction::after_rollback()
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN: wsrep::transaction::after_rollback: id = "
<< id().get();
#endif /* DEBUG_SR_SPEEDUP */
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
debug_log_state("after_rollback_enter");
assert(state() == s_aborting ||
@ -759,6 +811,10 @@ int wsrep::transaction::after_rollback()
wsrep::storage_service& storage_service(
sr_scope.storage_service());
storage_service.adopt_transaction(*this);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "before remove_fragments, line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
storage_service.remove_fragments();
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
}
@ -783,6 +839,9 @@ int wsrep::transaction::after_rollback()
// releasing the commit ordering critical section should be
// also postponed until all resources have been released.
debug_log_state("after_rollback_leave");
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END: wsrep::transaction::after_rollback";
#endif /* DEBUG_SR_SPEEDUP */
return 0;
}
@ -819,6 +878,9 @@ int wsrep::transaction::after_statement()
streaming_context_.fragment_size() &&
streaming_context_.fragment_unit() == streaming_context::statement)
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
ret = streaming_step(lock);
}
@ -1345,6 +1407,14 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
assert(lock.owns_lock());
assert(streaming_context_.fragment_size() || is_xa());
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__
<< ": bytes_generated = "
<< client_service_.bytes_generated()
<< ", log_position = "
<< streaming_context_.log_position();
#endif /* DEBUG_SR_SPEEDUP */
if (client_service_.bytes_generated() <
streaming_context_.log_position())
{
@ -1388,6 +1458,10 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
if (streaming_context_.fragment_size_exceeded() || force)
{
streaming_context_.reset_unit_counter();
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << __FUNCTION__ << ": bytes_to_replicate = "
<< bytes_to_replicate;
#endif /* DEBUG_SR_SPEEDUP */
ret = certify_fragment(lock);
}
@ -1397,6 +1471,12 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
int wsrep::transaction::certify_fragment(
wsrep::unique_lock<wsrep::mutex>& lock)
{
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
void *current_thd = wsrep_get_current_thd();
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN certify_fragment";
#endif /* DEBUG_SR_SPEEDUP */
assert(lock.owns_lock());
assert(client_state_.mode() == wsrep::client_state::m_local);
@ -1424,6 +1504,15 @@ int wsrep::transaction::certify_fragment(
}
streaming_context_.set_log_position(log_position);
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__
<< ", log_position = " << log_position
<< ", data size = " << data.size()
<< ", wsrep_get_current_thd() = "
<< wsrep_get_current_thd();
#endif /* DEBUG_SR_SPEEDUP */
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
if (data.size() == 0)
{
wsrep::log_warning() << "Attempt to replicate empty data buffer";
@ -1432,6 +1521,9 @@ int wsrep::transaction::certify_fragment(
return 0;
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
if (provider().append_data(ws_handle_,
wsrep::const_buffer(data.data(), data.size())))
{
@ -1440,6 +1532,9 @@ int wsrep::transaction::certify_fragment(
client_state_.override_error(wsrep::e_error_during_commit);
return 1;
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
if (is_xa())
{
@ -1490,12 +1585,20 @@ int wsrep::transaction::certify_fragment(
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
log_position - data.size(),
xid(), current_thd))
#else
xid()))
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
{
ret = 1;
error = wsrep::e_append_fragment_error;
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
if (ret == 0)
{
client_service_.debug_crash(
@ -1620,6 +1723,9 @@ int wsrep::transaction::certify_fragment(
state(lock, s_executing);
flags(flags() & ~wsrep::provider::flag::start_transaction);
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END certify_fragment";
#endif /* DEBUG_SR_SPEEDUP */
return ret;
}
@ -1630,6 +1736,9 @@ int wsrep::transaction::certify_commit(
assert(active());
client_service_.wait_for_replayers(lock);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN certify_commit";
#endif /* DEBUG_SR_SPEEDUP */
assert(lock.owns_lock());
if (abort_or_interrupt(lock))
@ -1805,6 +1914,11 @@ int wsrep::transaction::certify_commit(
break;
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END certify_commit: trx_id = "
<< id().get()
<< ", cert_ret = " << cert_ret;
#endif /* DEBUG_SR_SPEEDUP */
return ret;
}
@ -1831,6 +1945,9 @@ int wsrep::transaction::append_sr_keys_for_commit()
void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lock)
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN: wsrep::transaction::streaming_rollback";
#endif /* DEBUG_SR_SPEEDUP */
debug_log_state("streaming_rollback enter");
assert(state_ != s_must_replay);
assert(is_streaming());
@ -1874,6 +1991,9 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lo
}
}
debug_log_state("streaming_rollback leave");
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END: wsrep::transaction::streaming_rollback";
#endif /* DEBUG_SR_SPEEDUP */
}
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
@ -1913,6 +2033,10 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
wsrep::e_deadlock_error);
if (is_streaming())
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "before remove_fragments, line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
client_service_.remove_fragments();
clear_fragments();
}
@ -1932,6 +2056,9 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
void wsrep::transaction::clear_fragments()
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END: wsrep::transaction::clear_fragments";
#endif /* DEBUG_SR_SPEEDUP */
streaming_context_.cleanup();
}