1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-30 07:23:07 +03:00

Remove tracking of log_position

This patch removes log_position from streaming_context. The
log_position was meant for keeping track of the database specific
position corresponding to the changes that have been already
replicated by a streaming transaction. In reality, things may be more
complex in the DBMS side and a size_t may be unsufficient to keep
track of the progress of a streaming transaction. For example, in
MariaDB, it may be necessary to keep track of positions in both
transaction and statement caches. Suggesting that the responsibility
of tracking these position(s) should be delegated to client_service.
The log_position was also used to do sanity checks in
streaming_step(). Those sanity checks are preserved by simply keeping
track of the number of bytes that were certified by the streaming
transaction.
This commit is contained in:
Daniele Sciascia
2023-11-10 10:00:26 +01:00
parent a5d95f0175
commit 096b5c5c7a
6 changed files with 31 additions and 44 deletions

View File

@ -51,10 +51,8 @@ namespace db
{ {
return true; return true;
} }
int prepare_fragment_for_replication(wsrep::mutable_buffer&, int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
size_t& position) override
{ {
position = 0;
return 0; return 0;
} }
int remove_fragments() override { return 0; } int remove_fragments() override { return 0; }

View File

@ -88,17 +88,13 @@ namespace wsrep
/** /**
* Prepare a buffer containing data for the next fragment to replicate. * Prepare a buffer containing data for the next fragment to replicate.
* The caller may set log_position to record the database specific
* position corresponding to changes contained in the buffer.
* When the call returns, the log_position will be available to read
* from streaming_context::log_position().
* *
* @return Zero in case of success, non-zero on failure. * @return Zero in case of success, non-zero on failure.
* If there is no data to replicate, the method shall return * If there is no data to replicate, the method shall return
* zero and leave the buffer empty. * zero and leave the buffer empty.
*/ */
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, virtual int
size_t& log_position) = 0; prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) = 0;
/** /**
* Remove fragments from the storage within current transaction. * Remove fragments from the storage within current transaction.

View File

@ -42,12 +42,12 @@ namespace wsrep
streaming_context() streaming_context()
: fragments_certified_() : fragments_certified_()
, bytes_certified_()
, fragments_() , fragments_()
, rollback_replicated_for_() , rollback_replicated_for_()
, fragment_unit_() , fragment_unit_()
, fragment_size_() , fragment_size_()
, unit_counter_() , unit_counter_()
, log_position_()
{ } { }
/** /**
@ -78,10 +78,14 @@ namespace wsrep
/** Disable streaming replication. */ /** Disable streaming replication. */
void disable(); void disable();
/** Increment counter for certified fragments. */ /**
void certified() * Increment counter for certified fragments and total
* number of bytes.
*/
void certified(size_t bytes)
{ {
++fragments_certified_; ++fragments_certified_;
bytes_certified_ += bytes;
} }
/** Return number of certified fragments. */ /** Return number of certified fragments. */
@ -90,6 +94,12 @@ namespace wsrep
return fragments_certified_; return fragments_certified_;
} }
/** Return total number of bytes replicated. */
size_t bytes_certified() const
{
return bytes_certified_;
}
/** Mark fragment with seqno as stored in fragment store. */ /** Mark fragment with seqno as stored in fragment store. */
void stored(wsrep::seqno seqno); void stored(wsrep::seqno seqno);
@ -137,18 +147,6 @@ namespace wsrep
unit_counter_ = 0; unit_counter_ = 0;
} }
/** Return current log position. */
size_t log_position() const
{
return log_position_;
}
/** Set log position. */
void set_log_position(size_t position)
{
log_position_ = position;
}
/** Return vector of stored fragments. */ /** Return vector of stored fragments. */
const std::vector<wsrep::seqno>& fragments() const const std::vector<wsrep::seqno>& fragments() const
{ {
@ -168,12 +166,12 @@ namespace wsrep
void check_fragment_seqno(wsrep::seqno seqno); void check_fragment_seqno(wsrep::seqno seqno);
size_t fragments_certified_; size_t fragments_certified_;
size_t bytes_certified_;
std::vector<wsrep::seqno> fragments_; std::vector<wsrep::seqno> fragments_;
wsrep::transaction_id rollback_replicated_for_; wsrep::transaction_id rollback_replicated_for_;
enum fragment_unit fragment_unit_; enum fragment_unit fragment_unit_;
size_t fragment_size_; size_t fragment_size_;
size_t unit_counter_; size_t unit_counter_;
size_t log_position_;
}; };
} }

View File

@ -81,10 +81,10 @@ void wsrep::streaming_context::rolled_back(wsrep::transaction_id id)
void wsrep::streaming_context::cleanup() void wsrep::streaming_context::cleanup()
{ {
fragments_certified_ = 0; fragments_certified_ = 0;
bytes_certified_ = 0;
fragments_.clear(); fragments_.clear();
rollback_replicated_for_ = wsrep::transaction_id::undefined(); rollback_replicated_for_ = wsrep::transaction_id::undefined();
unit_counter_ = 0; unit_counter_ = 0;
log_position_ = 0;
} }
void wsrep::streaming_context::check_fragment_seqno( void wsrep::streaming_context::check_fragment_seqno(

View File

@ -1416,24 +1416,23 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
bool force) bool force)
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
assert(streaming_context_.fragment_size() || is_xa()); assert(streaming_context_.fragment_size() || force);
if (client_service_.bytes_generated() < const size_t bytes_generated(client_service_.bytes_generated());
streaming_context_.log_position()) const size_t bytes_certified(streaming_context_.bytes_certified());
if (bytes_generated < bytes_certified)
{ {
/* Something went wrong on DBMS side in keeping track of /* Something went wrong on DBMS side in keeping track of
generated bytes. Return an error to abort the transaction. */ generated bytes. Return an error to abort the transaction. */
wsrep::log_warning() << "Bytes generated " wsrep::log_warning() << "Bytes generated " << bytes_generated
<< client_service_.bytes_generated() << " less than bytes certified " << bytes_certified
<< " less than bytes certified "
<< streaming_context_.log_position()
<< ", aborting streaming transaction"; << ", aborting streaming transaction";
return 1; return 1;
} }
int ret(0);
const size_t bytes_to_replicate(client_service_.bytes_generated() - int ret(0);
streaming_context_.log_position()); const size_t bytes_to_replicate(bytes_generated - bytes_certified);
switch (streaming_context_.fragment_unit()) switch (streaming_context_.fragment_unit())
{ {
@ -1500,15 +1499,13 @@ int wsrep::transaction::certify_fragment(
} }
wsrep::mutable_buffer data; wsrep::mutable_buffer data;
size_t log_position(0); if (client_service_.prepare_fragment_for_replication(data))
if (client_service_.prepare_fragment_for_replication(data, log_position))
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
client_state_.override_error(wsrep::e_error_during_commit); client_state_.override_error(wsrep::e_error_during_commit);
return 1; return 1;
} }
streaming_context_.set_log_position(log_position);
if (data.size() == 0) if (data.size() == 0)
{ {
@ -1616,7 +1613,7 @@ int wsrep::transaction::certify_fragment(
case wsrep::provider::success: case wsrep::provider::success:
++fragments_certified_for_statement_; ++fragments_certified_for_statement_;
assert(sr_ws_meta.seqno().is_undefined() == false); assert(sr_ws_meta.seqno().is_undefined() == false);
streaming_context_.certified(); streaming_context_.certified(data.size());
if (storage_service.update_fragment_meta(sr_ws_meta)) if (storage_service.update_fragment_meta(sr_ws_meta))
{ {
storage_service.rollback(wsrep::ws_handle(), storage_service.rollback(wsrep::ws_handle(),
@ -1656,7 +1653,7 @@ int wsrep::transaction::certify_fragment(
// we take a risk of sending one rollback fragment for nothing. // we take a risk of sending one rollback fragment for nothing.
storage_service.rollback(wsrep::ws_handle(), storage_service.rollback(wsrep::ws_handle(),
wsrep::ws_meta()); wsrep::ws_meta());
streaming_context_.certified(); streaming_context_.certified(data.size());
ret = 1; ret = 1;
error = wsrep::e_deadlock_error; error = wsrep::e_deadlock_error;
break; break;
@ -2147,7 +2144,6 @@ void wsrep::transaction::debug_log_state(
<< ", unit: " << streaming_context_.fragment_unit() << ", unit: " << streaming_context_.fragment_unit()
<< ", size: " << streaming_context_.fragment_size() << ", size: " << streaming_context_.fragment_size()
<< ", counter: " << streaming_context_.unit_counter() << ", counter: " << streaming_context_.unit_counter()
<< ", log_pos: " << streaming_context_.log_position()
<< ", sr_rb: " << streaming_context_.rolled_back() << ", sr_rb: " << streaming_context_.rolled_back()
<< "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
<< " thread_id: " << client_state_.owning_thread_id_ << " thread_id: " << client_state_.owning_thread_id_

View File

@ -149,7 +149,7 @@ namespace wsrep
bool statement_allowed_for_streaming() const WSREP_OVERRIDE bool statement_allowed_for_streaming() const WSREP_OVERRIDE
{ return true; } { return true; }
int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, size_t& position) int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE WSREP_OVERRIDE
{ {
if (error_during_prepare_data_) if (error_during_prepare_data_)
@ -159,7 +159,6 @@ namespace wsrep
static const char buf[1] = { 1 }; static const char buf[1] = { 1 };
buffer.push_back(&buf[0], &buf[1]); buffer.push_back(&buf[0], &buf[1]);
wsrep::const_buffer data(buffer.data(), buffer.size()); wsrep::const_buffer data(buffer.data(), buffer.size());
position = buffer.size();
return client_state_->append_data(data); return client_state_->append_data(data);
} }