1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-22 23:21:53 +03:00

Record database log_postion for streaming

Class streaming_context can now keep track of database specific log
position for streaming. Previously, it  was assumed that the log
position was based of off the buffer size which was filled in
client_service::prepare_fragment_for_replication(). However, for XA
the buffer may contain data that is not in the log. Therefore,
subsequent calls to prepare_fragment_for_replication() could not find
the appropriate log_position based on the certified_bytes(), which
would returns the total number of bytes return in the buffers that
were certified.
This commit is contained in:
Daniele Sciascia
2019-02-14 00:08:23 +01:00
parent 24cd49b23f
commit f80f97db2f
3 changed files with 17 additions and 2 deletions

View File

@ -105,7 +105,7 @@ namespace wsrep
* If there is no data to replicate, the method shall return
* zero and leave the buffer empty.
*/
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0;
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&, size_t&) = 0;
/**
* Remove fragments from the storage within current transaction.

View File

@ -47,6 +47,7 @@ namespace wsrep
, fragment_size_()
, bytes_certified_()
, unit_counter_()
, log_position_()
{ }
/**
@ -167,6 +168,16 @@ namespace wsrep
unit_counter_ = 0;
}
size_t log_position() const
{
return log_position_;
}
void set_log_position(size_t position)
{
log_position_ = position;
}
const std::vector<wsrep::seqno>& fragments() const
{
return fragments_;
@ -184,6 +195,7 @@ namespace wsrep
rollback_replicated_for_ = wsrep::transaction_id::undefined();
bytes_certified_ = 0;
unit_counter_ = 0;
log_position_ = 0;
}
private:
@ -200,6 +212,7 @@ namespace wsrep
size_t fragment_size_;
size_t bytes_certified_;
size_t unit_counter_;
size_t log_position_;
};
}

View File

@ -1198,13 +1198,15 @@ int wsrep::transaction::certify_fragment(
lock.unlock();
wsrep::mutable_buffer data;
if (client_service_.prepare_fragment_for_replication(data))
size_t log_position(0);
if (client_service_.prepare_fragment_for_replication(data, log_position))
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_error_during_commit);
return 1;
}
streaming_context_.set_log_position(log_position);
if (data.size() == 0)
{