From 096b5c5c7a28a5ad73870ffca58bad839a7e638e Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Fri, 10 Nov 2023 10:00:26 +0100 Subject: [PATCH] Remove tracking of log_position This patch removes log_position from streaming_context. The log_position was meant for keeping track of the database specific position corresponding to the changes that have been already replicated by a streaming transaction. In reality, things may be more complex in the DBMS side and a size_t may be unsufficient to keep track of the progress of a streaming transaction. For example, in MariaDB, it may be necessary to keep track of positions in both transaction and statement caches. Suggesting that the responsibility of tracking these position(s) should be delegated to client_service. The log_position was also used to do sanity checks in streaming_step(). Those sanity checks are preserved by simply keeping track of the number of bytes that were certified by the streaming transaction. --- dbsim/db_client_service.hpp | 4 +--- include/wsrep/client_service.hpp | 8 ++------ include/wsrep/streaming_context.hpp | 30 ++++++++++++++--------------- src/streaming_context.cpp | 2 +- src/transaction.cpp | 28 ++++++++++++--------------- test/mock_client_state.hpp | 3 +-- 6 files changed, 31 insertions(+), 44 deletions(-) diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index be6f9ad..1af29f6 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -51,10 +51,8 @@ namespace db { return true; } - int prepare_fragment_for_replication(wsrep::mutable_buffer&, - size_t& position) override + int prepare_fragment_for_replication(wsrep::mutable_buffer&) override { - position = 0; return 0; } int remove_fragments() override { return 0; } diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index d47396d..1f83e4d 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -88,17 +88,13 @@ namespace wsrep /** * Prepare a buffer containing data for the next fragment to replicate. - * The caller may set log_position to record the database specific - * position corresponding to changes contained in the buffer. - * When the call returns, the log_position will be available to read - * from streaming_context::log_position(). * * @return Zero in case of success, non-zero on failure. * If there is no data to replicate, the method shall return * zero and leave the buffer empty. */ - virtual int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, - size_t& log_position) = 0; + virtual int + prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) = 0; /** * Remove fragments from the storage within current transaction. diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 2b0ef0f..7796418 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -42,12 +42,12 @@ namespace wsrep streaming_context() : fragments_certified_() + , bytes_certified_() , fragments_() , rollback_replicated_for_() , fragment_unit_() , fragment_size_() , unit_counter_() - , log_position_() { } /** @@ -78,10 +78,14 @@ namespace wsrep /** Disable streaming replication. */ void disable(); - /** Increment counter for certified fragments. */ - void certified() + /** + * Increment counter for certified fragments and total + * number of bytes. + */ + void certified(size_t bytes) { ++fragments_certified_; + bytes_certified_ += bytes; } /** Return number of certified fragments. */ @@ -90,6 +94,12 @@ namespace wsrep return fragments_certified_; } + /** Return total number of bytes replicated. */ + size_t bytes_certified() const + { + return bytes_certified_; + } + /** Mark fragment with seqno as stored in fragment store. */ void stored(wsrep::seqno seqno); @@ -137,18 +147,6 @@ namespace wsrep unit_counter_ = 0; } - /** Return current log position. */ - size_t log_position() const - { - return log_position_; - } - - /** Set log position. */ - void set_log_position(size_t position) - { - log_position_ = position; - } - /** Return vector of stored fragments. */ const std::vector& fragments() const { @@ -168,12 +166,12 @@ namespace wsrep void check_fragment_seqno(wsrep::seqno seqno); size_t fragments_certified_; + size_t bytes_certified_; std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; enum fragment_unit fragment_unit_; size_t fragment_size_; size_t unit_counter_; - size_t log_position_; }; } diff --git a/src/streaming_context.cpp b/src/streaming_context.cpp index c542307..2fef179 100644 --- a/src/streaming_context.cpp +++ b/src/streaming_context.cpp @@ -81,10 +81,10 @@ void wsrep::streaming_context::rolled_back(wsrep::transaction_id id) void wsrep::streaming_context::cleanup() { fragments_certified_ = 0; + bytes_certified_ = 0; fragments_.clear(); rollback_replicated_for_ = wsrep::transaction_id::undefined(); unit_counter_ = 0; - log_position_ = 0; } void wsrep::streaming_context::check_fragment_seqno( diff --git a/src/transaction.cpp b/src/transaction.cpp index 451e94d..2e1a984 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -1416,24 +1416,23 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, bool force) { assert(lock.owns_lock()); - assert(streaming_context_.fragment_size() || is_xa()); + assert(streaming_context_.fragment_size() || force); - if (client_service_.bytes_generated() < - streaming_context_.log_position()) + const size_t bytes_generated(client_service_.bytes_generated()); + const size_t bytes_certified(streaming_context_.bytes_certified()); + + if (bytes_generated < bytes_certified) { /* Something went wrong on DBMS side in keeping track of generated bytes. Return an error to abort the transaction. */ - wsrep::log_warning() << "Bytes generated " - << client_service_.bytes_generated() - << " less than bytes certified " - << streaming_context_.log_position() + wsrep::log_warning() << "Bytes generated " << bytes_generated + << " less than bytes certified " << bytes_certified << ", aborting streaming transaction"; return 1; } - int ret(0); - const size_t bytes_to_replicate(client_service_.bytes_generated() - - streaming_context_.log_position()); + int ret(0); + const size_t bytes_to_replicate(bytes_generated - bytes_certified); switch (streaming_context_.fragment_unit()) { @@ -1500,15 +1499,13 @@ int wsrep::transaction::certify_fragment( } wsrep::mutable_buffer data; - size_t log_position(0); - if (client_service_.prepare_fragment_for_replication(data, log_position)) + if (client_service_.prepare_fragment_for_replication(data)) { lock.lock(); state(lock, s_must_abort); client_state_.override_error(wsrep::e_error_during_commit); return 1; } - streaming_context_.set_log_position(log_position); if (data.size() == 0) { @@ -1616,7 +1613,7 @@ int wsrep::transaction::certify_fragment( case wsrep::provider::success: ++fragments_certified_for_statement_; assert(sr_ws_meta.seqno().is_undefined() == false); - streaming_context_.certified(); + streaming_context_.certified(data.size()); if (storage_service.update_fragment_meta(sr_ws_meta)) { storage_service.rollback(wsrep::ws_handle(), @@ -1656,7 +1653,7 @@ int wsrep::transaction::certify_fragment( // we take a risk of sending one rollback fragment for nothing. storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); - streaming_context_.certified(); + streaming_context_.certified(data.size()); ret = 1; error = wsrep::e_deadlock_error; break; @@ -2147,7 +2144,6 @@ void wsrep::transaction::debug_log_state( << ", unit: " << streaming_context_.fragment_unit() << ", size: " << streaming_context_.fragment_size() << ", counter: " << streaming_context_.unit_counter() - << ", log_pos: " << streaming_context_.log_position() << ", sr_rb: " << streaming_context_.rolled_back() << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) << " thread_id: " << client_state_.owning_thread_id_ diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 73b2775..af77a85 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -149,7 +149,7 @@ namespace wsrep bool statement_allowed_for_streaming() const WSREP_OVERRIDE { return true; } - int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, size_t& position) + int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) WSREP_OVERRIDE { if (error_during_prepare_data_) @@ -159,7 +159,6 @@ namespace wsrep static const char buf[1] = { 1 }; buffer.push_back(&buf[0], &buf[1]); wsrep::const_buffer data(buffer.data(), buffer.size()); - position = buffer.size(); return client_state_->append_data(data); }