diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index f7f1c7c..0007966 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -105,7 +105,7 @@ namespace wsrep * If there is no data to replicate, the method shall return * zero and leave the buffer empty. */ - virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; + virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&, size_t&) = 0; /** * Remove fragments from the storage within current transaction. diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index bccad9d..682c34d 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,6 +47,7 @@ namespace wsrep , fragment_size_() , bytes_certified_() , unit_counter_() + , log_position_() { } /** @@ -167,6 +168,16 @@ namespace wsrep unit_counter_ = 0; } + size_t log_position() const + { + return log_position_; + } + + void set_log_position(size_t position) + { + log_position_ = position; + } + const std::vector& fragments() const { return fragments_; @@ -184,6 +195,7 @@ namespace wsrep rollback_replicated_for_ = wsrep::transaction_id::undefined(); bytes_certified_ = 0; unit_counter_ = 0; + log_position_ = 0; } private: @@ -200,6 +212,7 @@ namespace wsrep size_t fragment_size_; size_t bytes_certified_; size_t unit_counter_; + size_t log_position_; }; } diff --git a/src/transaction.cpp b/src/transaction.cpp index 26a728b..df7869a 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -1198,13 +1198,15 @@ int wsrep::transaction::certify_fragment( lock.unlock(); wsrep::mutable_buffer data; - if (client_service_.prepare_fragment_for_replication(data)) + size_t log_position(0); + if (client_service_.prepare_fragment_for_replication(data, log_position)) { lock.lock(); state(lock, s_must_abort); client_state_.override_error(wsrep::e_error_during_commit); return 1; } + streaming_context_.set_log_position(log_position); if (data.size() == 0) {