From 434b52c75f0073ff1a03ce3243f6b61c5d757a57 Mon Sep 17 00:00:00 2001 From: Pekka Lampio Date: Mon, 17 May 2021 10:36:17 +0300 Subject: [PATCH] This is the first version of the SR speedup feature for MariaDB 10.6. Now all galera_sr tests succeed when SR speedup is disabled and most galera_sr MTR tests succeed when SR speedup up is enabled. The failing tests are listed in "skipped_tests" file. The following MTR test suite run succeeds: (cd mysql-test; ./mysql-test-run.pl --mysqld="--wsrep_SR_store=undolog" --suite=galera_sr --skip-test-list=../skipped_tests --force) --- CMakeLists.txt | 13 ++ include/wsrep/client_service.hpp | 8 ++ include/wsrep/client_state.hpp | 11 +- include/wsrep/high_priority_service.hpp | 9 ++ include/wsrep/storage_service.hpp | 12 +- include/wsrep/streaming_context.hpp | 14 ++ include/wsrep/transaction.hpp | 4 + src/server_state.cpp | 58 ++++++-- src/transaction.cpp | 174 +++++++++++++++++++++++- 9 files changed, 290 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 740f538..f3d1490 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,19 @@ option(WSREP_LIB_WITH_COVERAGE "Compile with coverage instrumentation" OFF) option(WSREP_LIB_STRICT_BUILD_FLAGS "Compile with strict build flags" OFF) option(WSREP_LIB_MAINTAINER_MODE "Fail compilation on any warnings" OFF) +# +# Option for WSREP's Streaming Replication speedup +# +option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" ${WITH_WSREP}) +#option(WITH_WSREP_SR_SPEEDUP "WSREP's Streaming Replication speedup" OFF) +## add_definitions(-DWITH_WSREP_SR_SPEEDUP_CHECK) +#option(WITH_WSREP_SR_SPEEDUP_TIME "Timing for Streaming Replication speedup" ON) +if (WITH_WSREP_SR_SPEEDUP) + ADD_DEFINITIONS(-DWITH_WSREP_SR_SPEEDUP) + ADD_DEFINITIONS(-DDEBUG_SR_SPEEDUP) +endif() + + # Compiler options # Set std to C++0x/C++11 if superproject has not set standard yet diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index d47396d..78d097d 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -217,6 +217,14 @@ namespace wsrep * been enabled. */ virtual void debug_crash(const char* crash_point) = 0; +#ifdef WITH_WSREP_SR_SPEEDUP + + /** + * Return the binlog cache for the currently execution + * transaction or a NULL pointer if no such cache exists. + */ + virtual void *get_binlog_cache() = 0; +#endif /* WITH_WSREP_SR_SPEEDUP */ }; } diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 3c518d6..9403bf2 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -926,7 +926,16 @@ namespace wsrep { return transaction_; } - +#ifdef WITH_WSREP_SR_SPEEDUP + /** + * Return a reference to the transaction associated + * with the client state. + */ + wsrep::transaction& transaction() + { + return transaction_; + } +#endif /* WITH_WSREP_SR_SPEEDUP */ const wsrep::ws_meta& toi_meta() const { return toi_meta_; diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index f1d011e..99567c7 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -98,6 +98,9 @@ namespace wsrep const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store, +#endif /* WITH_WSREP_SR_SPEEDUP */ const wsrep::xid& xid) = 0; /** @@ -144,8 +147,14 @@ namespace wsrep * * @return Zero in case of success, non-zero in case of failure */ +#ifdef WITH_WSREP_SR_SPEEDUP + virtual int rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool skip_rollback = false) = 0; +#else virtual int rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) = 0; +#endif /* WITH_WSREP_SR_SPEEDUP */ /** * Apply a TOI operation. diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index e68548b..9dcfb3a 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -65,8 +65,14 @@ namespace wsrep wsrep::transaction_id client_id, int flags, const wsrep::const_buffer& data, +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store, + size_t offset, + const wsrep::xid& xid, + void *binlog_cache) = 0; +#else const wsrep::xid& xid) = 0; - +#endif /* WITH_WSREP_SR_SPEEDUP */ /** * Update fragment meta data after certification process. */ @@ -77,6 +83,10 @@ namespace wsrep * adopted a transaction prior this call. */ virtual int remove_fragments() = 0; +#ifdef WITH_WSREP_SR_SPEEDUP + + int set_fragments_from_table() {return 0;}; +#endif /* WITH_WSREP_SR_SPEEDUP */ /** * Commit the transaction. diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 9b205c5..e8a1eaf 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -189,6 +189,17 @@ namespace wsrep unit_counter_ = 0; log_position_ = 0; } +#ifdef WITH_WSREP_SR_SPEEDUP + void set_sr_store(int store_type) + { + sr_store_ = store_type; + } + + int get_sr_store() const + { + return (sr_store_); + } +#endif /* WITH_WSREP_SR_SPEEDUP */ private: void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) @@ -204,6 +215,9 @@ namespace wsrep size_t fragment_size_; size_t unit_counter_; size_t log_position_; +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store_; +#endif /* WITH_WSREP_SR_SPEEDUP */ }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 7ad1090..942d216 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -142,6 +142,10 @@ namespace wsrep void xa_detach(); int xa_replay(wsrep::unique_lock&); +#ifdef WITH_WSREP_SR_SPEEDUP + int set_fragments_from_table(); + void *get_binlog_cache(); +#endif /* WITH_WSREP_SR_SPEEDUP */ bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } diff --git a/src/server_state.cpp b/src/server_state.cpp index 32fff87..9cecf17 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -87,15 +87,29 @@ static int apply_fragment(wsrep::server_state& server_state, wsrep::high_priority_service* streaming_applier, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) + const wsrep::const_buffer& data, int line) { int ret(0); int apply_err; wsrep::mutable_buffer err; +#ifdef WITH_WSREP_SR_SPEEDUP + int sr_store = streaming_applier->transaction().streaming_context(). + get_sr_store(); +#endif /* WITH_WSREP_SR_SPEEDUP */ +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "server_state: apply_fragment, line = " << line; +#endif /* DEBUG_SR_SPEEDUP */ { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); apply_err = streaming_applier->apply_write_set(ws_meta, data, err); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "server_state: apply_fragment, line = " + << __LINE__ << ", apply_err = " << apply_err + << ", sr_store = " + << streaming_applier->transaction( + ).streaming_context().get_sr_store(); +#endif /* DEBUG_SR_SPEEDUP */ if (!apply_err) { assert(err.size() == 0); @@ -124,7 +138,10 @@ static int apply_fragment(wsrep::server_state& server_state, } } } - +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "server_state: apply_fragment, line = " + << __LINE__ << ", ret = " << ret; +#endif /* DEBUG_SR_SPEEDUP */ if (!ret) { if (!apply_err) @@ -132,7 +149,11 @@ static int apply_fragment(wsrep::server_state& server_state, high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); const wsrep::xid xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data, xid); +#ifdef WITH_WSREP_SR_SPEEDUP + ws_handle, ws_meta, data, sr_store, xid); +#else + ws_handle, ws_meta, data, xid); +#endif /* WITH_WSREP_SR_SPEEDUP */ high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } @@ -357,7 +378,7 @@ static int apply_write_set(wsrep::server_state& server_state, sa, ws_handle, ws_meta, - data); + data, __LINE__); } else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags())) { @@ -386,7 +407,7 @@ static int apply_write_set(wsrep::server_state& server_state, sa, ws_handle, ws_meta, - data); + data, __LINE__); } } else if (wsrep::commits_transaction(ws_meta.flags())) @@ -1145,6 +1166,10 @@ void wsrep::server_state::start_streaming_client( WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), wsrep::log::debug_level_server_state, "Start streaming client: " << client_state->id()); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "server_state: start_streaming_client"; +#endif /* DEBUG_SR_SPEEDUP */ + if (streaming_clients_.insert( std::make_pair(client_state->id(), client_state)).second == false) { @@ -1246,6 +1271,10 @@ void wsrep::server_state::start_streaming_applier( wsrep::high_priority_service* sa) { wsrep::unique_lock lock(mutex_); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "start_streaming_applier: wsrep_trx_id = " + << transaction_id; +#endif /* DEBUG_SR_SPEEDUP */ if (streaming_appliers_.insert( std::make_pair(std::make_pair(server_id, transaction_id), sa)).second == false) @@ -1559,6 +1588,9 @@ void wsrep::server_state::close_orphaned_sr_transactions( void wsrep::server_state::close_transactions_at_disconnect( wsrep::high_priority_service& high_priority_service) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << "(" << __LINE__ << ")"; +#endif /* DEBUG_SR_SPEEDUP */ // Close streaming applier without removing fragments // from fragment storage. When the server is started again, // it must be able to recover ongoing streaming transactions. @@ -1569,9 +1601,19 @@ void wsrep::server_state::close_transactions_at_disconnect( { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); - streaming_applier->rollback( - wsrep::ws_handle(), wsrep::ws_meta()); - streaming_applier->after_apply(); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << "(" << __LINE__ + << ") skipped rollback for wsrep::trx = " + << streaming_applier->transaction().id(); +#endif /* DEBUG_SR_SPEEDUP */ +#ifdef WITH_WSREP_SR_SPEEDUP + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta(), true); +#else + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); +#endif /* WITH_WSREP_SR_SPEEDUP */ + streaming_applier->after_apply(); } streaming_appliers_.erase(i++); server_service_.release_high_priority_service(streaming_applier); diff --git a/src/transaction.cpp b/src/transaction.cpp index 63ea2e4..b08bea4 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -29,6 +29,12 @@ #include #include +#ifdef WITH_WSREP_SR_SPEEDUP +extern void *wsrep_get_current_thd(); +extern int fragment_cache_remove_transaction( + const wsrep::id&, wsrep::transaction_id); +#endif /* WITH_WSREP_SR_SPEEDUP */ + namespace { class storage_service_deleter @@ -129,6 +135,10 @@ int wsrep::transaction::start_transaction( state_hist_.clear(); ws_handle_ = wsrep::ws_handle(id); flags(wsrep::provider::flag::start_transaction); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__ + << ": id = " << id.get(); +#endif /* DEBUG_SR_SPEEDUP */ switch (client_state_.mode()) { case wsrep::client_state::m_high_priority: @@ -175,6 +185,10 @@ int wsrep::transaction::start_transaction( certified_ = true; } debug_log_state("start_transaction leave"); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__ + << ": id = " << id().get(); +#endif /* DEBUG_SR_SPEEDUP */ return 0; } @@ -202,6 +216,12 @@ void wsrep::transaction::fragment_applied(wsrep::seqno seqno) { assert(active()); streaming_context_.applied(seqno); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "wsrep::transaction::fragment_applied: " + << "SR context = " << (void *)&streaming_context_ + << ", size = " << streaming_context_.fragments_stored() + << ", seqno = " << seqno.get(); +#endif /* DEBUG_SR_SPEEDUP */ } int wsrep::transaction::prepare_for_ordering( @@ -250,6 +270,9 @@ int wsrep::transaction::append_key(const wsrep::key& key) int wsrep::transaction::append_data(const wsrep::const_buffer& data) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << ": size = " << data.size(); +#endif /* DEBUG_SR_SPEEDUP */ return provider().append_data(ws_handle_, data); } @@ -262,6 +285,9 @@ int wsrep::transaction::after_row() if (streaming_context_.fragment_size() && streaming_context_.fragment_unit() != streaming_context::statement) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__; +#endif /* DEBUG_SR_SPEEDUP */ ret = streaming_step(lock); } debug_log_state("after_row_leave"); @@ -304,6 +330,10 @@ int wsrep::transaction::before_prepare( // Note: we can't remove fragments here for XA, // the transaction has already issued XA END and // is in IDLE state, no more changes allowed! +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before remove_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ ret = client_service_.remove_fragments(); if (ret) { @@ -329,6 +359,9 @@ int wsrep::transaction::before_prepare( flags(flags() | wsrep::provider::flag::pa_unsafe); append_sr_keys_for_commit(); const bool force_streaming_step = true; +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ ret = streaming_step(lock, force_streaming_step); if (ret == 0) { @@ -339,6 +372,10 @@ int wsrep::transaction::before_prepare( else { ret = certify_commit(lock); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "after certify_commit(): line = " + << __LINE__ << ", ret = " << ret; +#endif /* DEBUG_SR_SPEEDUP */ } assert((ret == 0 && state() == s_preparing) || @@ -485,6 +522,10 @@ int wsrep::transaction::before_commit() if (ret == 0 && state() == s_prepared) { ret = certify_commit(lock); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "after certify_commit(): line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || @@ -589,6 +630,10 @@ int wsrep::transaction::ordered_commit() { state(lock, s_ordered_commit); } +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "wsrep::transaction::" << __FUNCTION__ + << ": id = " << id().get() << ", --> " << ret; +#endif /* DEBUG_SR_SPEEDUP */ debug_log_state("ordered_commit_leave"); return ret; } @@ -619,7 +664,11 @@ int wsrep::transaction::after_commit() wsrep::storage_service& storage_service( sr_scope.storage_service()); storage_service.adopt_transaction(*this); - storage_service.remove_fragments(); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before remove_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ + storage_service.remove_fragments(); storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); lock.lock(); } @@ -654,6 +703,10 @@ int wsrep::transaction::after_commit() int wsrep::transaction::before_rollback() { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "BEGIN: wsrep::transaction::before_rollback: id = " + << id().get(); +#endif /* DEBUG_SR_SPEEDUP */ wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("before_rollback_enter"); assert(state() == s_executing || @@ -736,11 +789,18 @@ int wsrep::transaction::before_rollback() } debug_log_state("before_rollback_leave"); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END: wsrep::transaction::before_rollback"; +#endif /* DEBUG_SR_SPEEDUP */ return 0; } int wsrep::transaction::after_rollback() { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "BEGIN: wsrep::transaction::after_rollback: id = " + << id().get(); +#endif /* DEBUG_SR_SPEEDUP */ wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_rollback_enter"); assert(state() == s_aborting || @@ -759,7 +819,11 @@ int wsrep::transaction::after_rollback() wsrep::storage_service& storage_service( sr_scope.storage_service()); storage_service.adopt_transaction(*this); - storage_service.remove_fragments(); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before remove_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ + storage_service.remove_fragments(); storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); } lock.lock(); @@ -783,6 +847,9 @@ int wsrep::transaction::after_rollback() // releasing the commit ordering critical section should be // also postponed until all resources have been released. debug_log_state("after_rollback_leave"); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END: wsrep::transaction::after_rollback"; +#endif /* DEBUG_SR_SPEEDUP */ return 0; } @@ -819,6 +886,9 @@ int wsrep::transaction::after_statement() streaming_context_.fragment_size() && streaming_context_.fragment_unit() == streaming_context::statement) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << ", line = " << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ ret = streaming_step(lock); } @@ -1237,6 +1307,34 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) return ret; } +#ifdef WITH_WSREP_SR_SPEEDUP +int wsrep::transaction::set_fragments_from_table() +{ + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + + int rcode = storage_service.set_fragments_from_table(); + + return (rcode); +} + +void *wsrep::transaction::get_binlog_cache() +{ + void *cache = client_service_.get_binlog_cache(); + + assert(cache); + + return (cache); +} + +#endif /* WITH_WSREP_SR_SPEEDUP */ + + //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1345,6 +1443,14 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, assert(lock.owns_lock()); assert(streaming_context_.fragment_size() || is_xa()); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ + << ": bytes_generated = " + << client_service_.bytes_generated() + << ", log_position = " + << streaming_context_.log_position(); + +#endif /* DEBUG_SR_SPEEDUP */ if (client_service_.bytes_generated() < streaming_context_.log_position()) { @@ -1388,6 +1494,10 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, if (streaming_context_.fragment_size_exceeded() || force) { streaming_context_.reset_unit_counter(); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << __FUNCTION__ << ": bytes_to_replicate = " + << bytes_to_replicate; +#endif /* DEBUG_SR_SPEEDUP */ ret = certify_fragment(lock); } @@ -1397,6 +1507,9 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, int wsrep::transaction::certify_fragment( wsrep::unique_lock& lock) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "BEGIN certify_fragment"; +#endif /* DEBUG_SR_SPEEDUP */ assert(lock.owns_lock()); assert(client_state_.mode() == wsrep::client_state::m_local); @@ -1424,6 +1537,15 @@ int wsrep::transaction::certify_fragment( } streaming_context_.set_log_position(log_position); +#ifdef WITH_WSREP_SR_SPEEDUP +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "certify_fragment, line = " << __LINE__ + << ", log_position = " << log_position + << ", data size = " << data.size() + << ", wsrep_get_current_thd() = " + << wsrep_get_current_thd(); +#endif /* DEBUG_SR_SPEEDUP */ +#endif /* WITH_WSREP_SR_SPEEDUP */ if (data.size() == 0) { wsrep::log_warning() << "Attempt to replicate empty data buffer"; @@ -1432,6 +1554,9 @@ int wsrep::transaction::certify_fragment( return 0; } +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "certify_fragment, line = " << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ if (provider().append_data(ws_handle_, wsrep::const_buffer(data.data(), data.size()))) { @@ -1440,6 +1565,9 @@ int wsrep::transaction::certify_fragment( client_state_.override_error(wsrep::e_error_during_commit); return 1; } +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "certify_fragment, line = " << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ if (is_xa()) { @@ -1490,12 +1618,20 @@ int wsrep::transaction::certify_fragment( id(), flags(), wsrep::const_buffer(data.data(), data.size()), +#ifdef WITH_WSREP_SR_SPEEDUP + streaming_context_.get_sr_store(), + log_position - data.size(), + xid(), get_binlog_cache())) +#else xid())) +#endif /* WITH_WSREP_SR_SPEEDUP */ { ret = 1; error = wsrep::e_append_fragment_error; } - +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "certify_fragment, line = " << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ if (ret == 0) { client_service_.debug_crash( @@ -1620,6 +1756,9 @@ int wsrep::transaction::certify_fragment( state(lock, s_executing); flags(flags() & ~wsrep::provider::flag::start_transaction); } +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END certify_fragment"; +#endif /* DEBUG_SR_SPEEDUP */ return ret; } @@ -1630,6 +1769,9 @@ int wsrep::transaction::certify_commit( assert(active()); client_service_.wait_for_replayers(lock); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "BEGIN certify_commit"; +#endif /* DEBUG_SR_SPEEDUP */ assert(lock.owns_lock()); if (abort_or_interrupt(lock)) @@ -1805,6 +1947,11 @@ int wsrep::transaction::certify_commit( break; } +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END certify_commit: trx_id = " + << id().get() + << ", cert_ret = " << cert_ret; +#endif /* DEBUG_SR_SPEEDUP */ return ret; } @@ -1831,6 +1978,9 @@ int wsrep::transaction::append_sr_keys_for_commit() void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lock) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "BEGIN: wsrep::transaction::streaming_rollback"; +#endif /* DEBUG_SR_SPEEDUP */ debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); assert(is_streaming()); @@ -1874,6 +2024,9 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo } } debug_log_state("streaming_rollback leave"); +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END: wsrep::transaction::streaming_rollback"; +#endif /* DEBUG_SR_SPEEDUP */ } int wsrep::transaction::replay(wsrep::unique_lock& lock) @@ -1904,6 +2057,10 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) } if (is_streaming()) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before clear_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ clear_fragments(); } provider().release(ws_handle_); @@ -1913,6 +2070,10 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) wsrep::e_deadlock_error); if (is_streaming()) { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "before remove_fragments, line = " + << __LINE__; +#endif /* DEBUG_SR_SPEEDUP */ client_service_.remove_fragments(); clear_fragments(); } @@ -1932,7 +2093,14 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) void wsrep::transaction::clear_fragments() { +#ifdef DEBUG_SR_SPEEDUP + wsrep::log_info() << "END: wsrep::transaction::clear_fragments: id = " + << id_ << ", server = |" << server_id_ << "|"; +#endif /* DEBUG_SR_SPEEDUP */ streaming_context_.cleanup(); +#ifdef WITH_WSREP_SR_SPEEDUP + fragment_cache_remove_transaction(server_id_, id_); +#endif /* WITH_WSREP_SR_SPEEDUP */ } void wsrep::transaction::cleanup()