1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-02 05:22:26 +03:00

Cache rollback events that failed to replicate for later retry

This patch introduces a queue to store ids of transactions that failed
to send a rollback fragment in streaming_rollback(). This is to avoid
potentially  missed rollback fragments when a cluster splits and then
later reforms. Rollback fragments would be missing if a node rolled
back a transaction locally (either BFed or voluntary rollback) while
non-primary, and the attempt to send rollback fragment failed in
transaction::streaming_rollback().
Transaction that fail to send rollback fragment can proceed to
rollback locally.  However we must ensure that rollback fragments for
those transactions are eventually delivered by the cluster. This must
be done before a potentially conflicting writeset causes BF-BF
conflicts in the rest of the cluster.
This commit is contained in:
Daniele Sciascia
2021-09-24 10:45:34 +02:00
parent efb4aab090
commit 22921e7082
7 changed files with 443 additions and 28 deletions

View File

@ -92,6 +92,7 @@
#include "compiler.hpp"
#include "xid.hpp"
#include <deque>
#include <vector>
#include <string>
#include <map>
@ -271,6 +272,17 @@ namespace wsrep
wsrep::high_priority_service* find_streaming_applier(
const wsrep::xid& xid) const;
/**
* Queue a rollback fragment the transaction with given id
*/
void queue_rollback_event(const wsrep::transaction_id& id);
/**
* Send rollback fragments for previously queued events via
* queue_rollback_event()
*/
enum wsrep::provider::status send_pending_rollback_events();
/**
* Load WSRep provider.
*
@ -632,6 +644,7 @@ namespace wsrep
, previous_primary_view_()
, current_view_()
, last_committed_gtid_()
, rollback_event_queue_()
{ }
private:
@ -670,6 +683,11 @@ namespace wsrep
void go_final(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::view&, wsrep::high_priority_service*);
// Send rollback fragments for all transactions in
// rollback_event_queue_
enum wsrep::provider::status send_pending_rollback_events(
wsrep::unique_lock<wsrep::mutex>& lock);
wsrep::mutex& mutex_;
wsrep::condition_variable& cond_;
wsrep::server_service& server_service_;
@ -693,20 +711,6 @@ namespace wsrep
streaming_clients_map streaming_clients_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>,
wsrep::high_priority_service*> streaming_appliers_map;
class server_id_cmp
{
public:
server_id_cmp(const wsrep::id& server_id)
: server_id_(server_id)
{ }
bool operator()(const std::vector<wsrep::view::member>::value_type& vt) const
{
return (vt.id() == server_id_);
}
private:
wsrep::id server_id_;
};
streaming_appliers_map streaming_appliers_;
bool streaming_appliers_recovered_;
wsrep::provider* provider_;
@ -722,9 +726,9 @@ namespace wsrep
wsrep::view previous_primary_view_;
wsrep::view current_view_;
wsrep::gtid last_committed_gtid_;
std::deque<wsrep::transaction_id> rollback_event_queue_;
};
static inline const char* to_c_string(
enum wsrep::server_state::state state)
{