From f80f97db2f6549cf3871a0c3018fd04291f08dab Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Thu, 14 Feb 2019 00:08:23 +0100 Subject: [PATCH] Record database log_postion for streaming Class streaming_context can now keep track of database specific log position for streaming. Previously, it was assumed that the log position was based of off the buffer size which was filled in client_service::prepare_fragment_for_replication(). However, for XA the buffer may contain data that is not in the log. Therefore, subsequent calls to prepare_fragment_for_replication() could not find the appropriate log_position based on the certified_bytes(), which would returns the total number of bytes return in the buffers that were certified. --- include/wsrep/client_service.hpp | 2 +- include/wsrep/streaming_context.hpp | 13 +++++++++++++ src/transaction.cpp | 4 +++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index f7f1c7c..0007966 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -105,7 +105,7 @@ namespace wsrep * If there is no data to replicate, the method shall return * zero and leave the buffer empty. */ - virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; + virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&, size_t&) = 0; /** * Remove fragments from the storage within current transaction. diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index bccad9d..682c34d 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,6 +47,7 @@ namespace wsrep , fragment_size_() , bytes_certified_() , unit_counter_() + , log_position_() { } /** @@ -167,6 +168,16 @@ namespace wsrep unit_counter_ = 0; } + size_t log_position() const + { + return log_position_; + } + + void set_log_position(size_t position) + { + log_position_ = position; + } + const std::vector& fragments() const { return fragments_; @@ -184,6 +195,7 @@ namespace wsrep rollback_replicated_for_ = wsrep::transaction_id::undefined(); bytes_certified_ = 0; unit_counter_ = 0; + log_position_ = 0; } private: @@ -200,6 +212,7 @@ namespace wsrep size_t fragment_size_; size_t bytes_certified_; size_t unit_counter_; + size_t log_position_; }; } diff --git a/src/transaction.cpp b/src/transaction.cpp index 26a728b..df7869a 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -1198,13 +1198,15 @@ int wsrep::transaction::certify_fragment( lock.unlock(); wsrep::mutable_buffer data; - if (client_service_.prepare_fragment_for_replication(data)) + size_t log_position(0); + if (client_service_.prepare_fragment_for_replication(data, log_position)) { lock.lock(); state(lock, s_must_abort); client_state_.override_error(wsrep::e_error_during_commit); return 1; } + streaming_context_.set_log_position(log_position); if (data.size() == 0) {