1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-31 18:24:25 +03:00

The changes:

* Removed WITH_WSREP_SR_SPEEDUP_REPLAY and DEBUG_SR_SPEEDUP_REPLAY
 * Added "--wsrep_SR_store=undolog" for enabling SR speedup
 * Modified MTR tests "galera_sr.GCF-572" and
   "galera_sr.galera_sr_cc_master"
 * Improved the code that removes unneeded fragments from
   binlog_fragment_cache.

Now all galera_sr tests succeed when SR speedup is disabled
and most galera_sr MTR tests succeed when SR speedup up is enabled.
The failing tests are listed in "skipped_tests" file.

The following MTR test suite run succeeds:

(cd mysql-test; ./mysql-test-run.pl --mysqld="--wsrep_SR_store=undolog"
                 --suite=galera_sr --skip-test-list=../skipped_tests --force)
This commit is contained in:
Pekka Lampio
2021-05-10 13:54:41 +03:00
parent df55a53c35
commit 4225ee526b
7 changed files with 57 additions and 24 deletions

View File

@ -53,15 +53,13 @@ option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF)
#
# Option for WSREP's Streaming Replication speedup
#
option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ON)
option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ${WITH_WSREP})
#option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" OFF)
## add_definitions(-DWITH_WSREP_SR_SPEEDUP_CHECK)
#option(WITH_WSREP_SR_SPEEDUP_TIME "Timing for Streaming Replication speedup" ON)
if (WITH_WSREP_SR_SPEEDUP)
ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP)
ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP)
# ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP_REPLAY)
# ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP_REPLAY)
endif()

View File

@ -217,8 +217,8 @@ namespace wsrep
* been enabled.
*/
virtual void debug_crash(const char* crash_point) = 0;
#ifdef WITH_WSREP_SR_SPEEDUP
/**
* Return the binlog cache for the currently execution
* transaction or a NULL pointer if no such cache exists.

View File

@ -98,6 +98,9 @@ namespace wsrep
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
#ifdef WITH_WSREP_SR_SPEEDUP
int sr_store,
#endif /* WITH_WSREP_SR_SPEEDUP */
const wsrep::xid& xid) = 0;
/**

View File

@ -65,13 +65,14 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
#ifdef WITH_WSREP_SR_SPEEDUP
int sr_store,
size_t offset,
const wsrep::xid& xid,
void *binlog_cache) = 0;
#else
const wsrep::xid& xid) = 0;
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
#endif /* WITH_WSREP_SR_SPEEDUP */
/**
* Update fragment meta data after certification process.
*/
@ -82,8 +83,8 @@ namespace wsrep
* adopted a transaction prior this call.
*/
virtual int remove_fragments() = 0;
#ifdef WITH_WSREP_SR_SPEEDUP
int set_fragments_from_table() {return 0;};
#endif /* WITH_WSREP_SR_SPEEDUP */

View File

@ -189,6 +189,17 @@ namespace wsrep
unit_counter_ = 0;
log_position_ = 0;
}
#ifdef WITH_WSREP_SR_SPEEDUP
void set_sr_store(int store_type)
{
sr_store_ = store_type;
}
int get_sr_store() const
{
return (sr_store_);
}
#endif /* WITH_WSREP_SR_SPEEDUP */
private:
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
@ -204,6 +215,9 @@ namespace wsrep
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
#ifdef WITH_WSREP_SR_SPEEDUP
int sr_store_;
#endif /* WITH_WSREP_SR_SPEEDUP */
};
}

View File

@ -92,6 +92,10 @@ static int apply_fragment(wsrep::server_state& server_state,
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
#ifdef WITH_WSREP_SR_SPEEDUP
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
#endif /* WITH_WSREP_SR_SPEEDUP */
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "server_state: apply_fragment, line = " << line;
#endif /* DEBUG_SR_SPEEDUP */
@ -101,7 +105,10 @@ static int apply_fragment(wsrep::server_state& server_state,
apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "server_state: apply_fragment, line = "
<< __LINE__ << ", apply_err = " << apply_err;
<< __LINE__ << ", apply_err = " << apply_err
<< ", sr_store = "
<< streaming_applier->transaction(
).streaming_context().get_sr_store();
#endif /* DEBUG_SR_SPEEDUP */
if (!apply_err)
{
@ -142,7 +149,11 @@ static int apply_fragment(wsrep::server_state& server_state,
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const wsrep::xid xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
#ifdef WITH_WSREP_SR_SPEEDUP
ws_handle, ws_meta, data, sr_store, xid);
#else
ws_handle, ws_meta, data, xid);
#endif /* WITH_WSREP_SR_SPEEDUP */
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}

View File

@ -29,10 +29,12 @@
#include <sstream>
#include <memory>
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
#ifdef WITH_WSREP_SR_SPEEDUP
extern void *wsrep_get_current_thd();
extern int fragment_cache_remove_transaction(
const wsrep::id&, wsrep::transaction_id);
#endif /* WITH_WSREP_SR_SPEEDUP */
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
namespace
{
class storage_service_deleter
@ -1505,9 +1507,6 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
int wsrep::transaction::certify_fragment(
wsrep::unique_lock<wsrep::mutex>& lock)
{
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
void *current_thd = wsrep_get_current_thd();
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "BEGIN certify_fragment";
#endif /* DEBUG_SR_SPEEDUP */
@ -1538,7 +1537,7 @@ int wsrep::transaction::certify_fragment(
}
streaming_context_.set_log_position(log_position);
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
#ifdef WITH_WSREP_SR_SPEEDUP
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__
<< ", log_position = " << log_position
@ -1546,7 +1545,7 @@ int wsrep::transaction::certify_fragment(
<< ", wsrep_get_current_thd() = "
<< wsrep_get_current_thd();
#endif /* DEBUG_SR_SPEEDUP */
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
#endif /* WITH_WSREP_SR_SPEEDUP */
if (data.size() == 0)
{
wsrep::log_warning() << "Attempt to replicate empty data buffer";
@ -1619,17 +1618,17 @@ int wsrep::transaction::certify_fragment(
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY
#ifdef WITH_WSREP_SR_SPEEDUP
streaming_context_.get_sr_store(),
log_position - data.size(),
xid(), get_binlog_cache()))
#else
xid()))
#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */
#endif /* WITH_WSREP_SR_SPEEDUP */
{
ret = 1;
error = wsrep::e_append_fragment_error;
}
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "certify_fragment, line = " << __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
@ -1668,7 +1667,6 @@ int wsrep::transaction::certify_fragment(
else
{
streaming_context_.stored(sr_ws_meta.seqno());
}
client_service_.debug_crash(
"crash_replicate_fragment_success");
@ -2059,6 +2057,10 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
}
if (is_streaming())
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "before clear_fragments, line = "
<< __LINE__;
#endif /* DEBUG_SR_SPEEDUP */
clear_fragments();
}
provider().release(ws_handle_);
@ -2092,9 +2094,13 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
void wsrep::transaction::clear_fragments()
{
#ifdef DEBUG_SR_SPEEDUP
wsrep::log_info() << "END: wsrep::transaction::clear_fragments";
wsrep::log_info() << "END: wsrep::transaction::clear_fragments: id = "
<< id_ << ", server = |" << server_id_ << "|";
#endif /* DEBUG_SR_SPEEDUP */
streaming_context_.cleanup();
#ifdef WITH_WSREP_SR_SPEEDUP
fragment_cache_remove_transaction(server_id_, id_);
#endif /* WITH_WSREP_SR_SPEEDUP */
}
void wsrep::transaction::cleanup()