diff --git a/CMakeLists.txt b/CMakeLists.txt index 452186e..f3d1490 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,15 +53,13 @@ option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF) # # Option for WSREP's Streaming Replication speedup # -option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ON) +option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ${WITH_WSREP}) #option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" OFF) ## add_definitions(-DWITH_WSREP_SR_SPEEDUP_CHECK) #option(WITH_WSREP_SR_SPEEDUP_TIME "Timing for Streaming Replication speedup" ON) if (WITH_WSREP_SR_SPEEDUP) ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP) ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP) -# ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP_REPLAY) -# ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP_REPLAY) endif() diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 93cf70d..78d097d 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -217,8 +217,8 @@ namespace wsrep * been enabled. */ virtual void debug_crash(const char* crash_point) = 0; - #ifdef WITH_WSREP_SR_SPEEDUP + /** * Return the binlog cache for the currently execution * transaction or a NULL pointer if no such cache exists. diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 573f5e4..99567c7 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -98,6 +98,9 @@ namespace wsrep const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store, +#endif /* WITH_WSREP_SR_SPEEDUP */ const wsrep::xid& xid) = 0; /** diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 11d4e71..9dcfb3a 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -65,13 +65,14 @@ namespace wsrep wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data, -#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store, size_t offset, const wsrep::xid& xid, void *binlog_cache) = 0; #else const wsrep::xid& xid) = 0; -#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */ +#endif /* WITH_WSREP_SR_SPEEDUP */ /** * Update fragment meta data after certification process. */ @@ -82,8 +83,8 @@ namespace wsrep * adopted a transaction prior this call. */ virtual int remove_fragments() = 0; - #ifdef WITH_WSREP_SR_SPEEDUP + int set_fragments_from_table() {return 0;}; #endif /* WITH_WSREP_SR_SPEEDUP */ diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 9b205c5..39af163 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -189,6 +189,17 @@ namespace wsrep unit_counter_ = 0; log_position_ = 0; } +#ifdef WITH_WSREP_SR_SPEEDUP + void set_sr_store(int store_type) + { + sr_store_ = store_type; + } + + int get_sr_store() const + { + return (sr_store_); + } +#endif /* WITH_WSREP_SR_SPEEDUP */ private: void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) @@ -204,6 +215,9 @@ namespace wsrep size_t fragment_size_; size_t unit_counter_; size_t log_position_; +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store_; +#endif /* WITH_WSREP_SR_SPEEDUP */ }; } diff --git a/src/server_state.cpp b/src/server_state.cpp index 57a6171..0fddc35 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -92,6 +92,10 @@ static int apply_fragment(wsrep::server_state& server_state, int ret(0); int apply_err; wsrep::mutable_buffer err; +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store = streaming_applier->transaction().streaming_context(). + get_sr_store(); +#endif /* WITH_WSREP_SR_SPEEDUP */ #ifdef DEBUG_SR_SPEEDUP wsrep::log_info() << "server_state: apply_fragment, line = " << line; #endif /* DEBUG_SR_SPEEDUP */ @@ -101,7 +105,10 @@ static int apply_fragment(wsrep::server_state& server_state, apply_err = streaming_applier->apply_write_set(ws_meta, data, err); #ifdef DEBUG_SR_SPEEDUP wsrep::log_info() << "server_state: apply_fragment, line = " - << __LINE__ << ", apply_err = " << apply_err; + << __LINE__ << ", apply_err = " << apply_err + << ", sr_store = " + << streaming_applier->transaction( + ).streaming_context().get_sr_store(); #endif /* DEBUG_SR_SPEEDUP */ if (!apply_err) { @@ -142,7 +149,11 @@ static int apply_fragment(wsrep::server_state& server_state, high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); const wsrep::xid xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data, xid); +#ifdef WITH_WSREP_SR_SPEEDUP + ws_handle, ws_meta, data, sr_store, xid); +#else + ws_handle, ws_meta, data, xid); +#endif /* WITH_WSREP_SR_SPEEDUP */ high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } diff --git a/src/transaction.cpp b/src/transaction.cpp index 139740c..2cf0b2f 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -29,10 +29,12 @@ #include #include -#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY +#ifdef WITH_WSREP_SR_SPEEDUP extern void *wsrep_get_current_thd(); +extern int fragment_cache_remove_transaction( + const wsrep::id&, wsrep::transaction_id); +#endif /* WITH_WSREP_SR_SPEEDUP */ -#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */ namespace { class storage_service_deleter @@ -1505,9 +1507,6 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, int wsrep::transaction::certify_fragment( wsrep::unique_lock& lock) { -#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY - void *current_thd = wsrep_get_current_thd(); -#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */ #ifdef DEBUG_SR_SPEEDUP wsrep::log_info() << "BEGIN certify_fragment"; #endif /* DEBUG_SR_SPEEDUP */ @@ -1538,7 +1537,7 @@ int wsrep::transaction::certify_fragment( } streaming_context_.set_log_position(log_position); -#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY +#ifdef WITH_WSREP_SR_SPEEDUP #ifdef DEBUG_SR_SPEEDUP wsrep::log_info() << "certify_fragment, line = " << __LINE__ << ", log_position = " << log_position @@ -1546,7 +1545,7 @@ int wsrep::transaction::certify_fragment( << ", wsrep_get_current_thd() = " << wsrep_get_current_thd(); #endif /* DEBUG_SR_SPEEDUP */ -#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */ +#endif /* WITH_WSREP_SR_SPEEDUP */ if (data.size() == 0) { wsrep::log_warning() << "Attempt to replicate empty data buffer"; @@ -1619,17 +1618,17 @@ int wsrep::transaction::certify_fragment( id(), flags(), wsrep::const_buffer(data.data(), data.size()), -#ifdef WITH_WSREP_SR_SPEEDUP_REPLAY +#ifdef WITH_WSREP_SR_SPEEDUP + streaming_context_.get_sr_store(), log_position - data.size(), xid(), get_binlog_cache())) #else xid())) -#endif /* WITH_WSREP_SR_SPEEDUP_REPLAY */ +#endif /* WITH_WSREP_SR_SPEEDUP */ { ret = 1; error = wsrep::e_append_fragment_error; } - #ifdef DEBUG_SR_SPEEDUP wsrep::log_info() << "certify_fragment, line = " << __LINE__; #endif /* DEBUG_SR_SPEEDUP */ @@ -1668,7 +1667,6 @@ int wsrep::transaction::certify_fragment( else { streaming_context_.stored(sr_ws_meta.seqno()); - } client_service_.debug_crash( "crash_replicate_fragment_success"); @@ -2059,6 +2057,10 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) } if (is_streaming()) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before clear_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ clear_fragments(); } provider().release(ws_handle_); @@ -2069,10 +2071,10 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) if (is_streaming()) { #ifdef DEBUG_SR_SPEEDUP - wsrep::log_info() << "before remove_fragments, line = " - << __LINE__; + wsrep::log_info() << "before remove_fragments, line = " + << __LINE__; #endif /* DEBUG_SR_SPEEDUP */ - client_service_.remove_fragments(); + client_service_.remove_fragments(); clear_fragments(); } state(lock, s_aborted); @@ -2092,9 +2094,13 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) void wsrep::transaction::clear_fragments() { #ifdef DEBUG_SR_SPEEDUP - wsrep::log_info() << "END: wsrep::transaction::clear_fragments"; + wsrep::log_info() << "END: wsrep::transaction::clear_fragments: id = " + << id_ << ", server = |" << server_id_ << "|"; #endif /* DEBUG_SR_SPEEDUP */ streaming_context_.cleanup(); +#ifdef WITH_WSREP_SR_SPEEDUP + fragment_cache_remove_transaction(server_id_, id_); +#endif /* WITH_WSREP_SR_SPEEDUP */ } void wsrep::transaction::cleanup()