1
0
mirror of https://github.com/postgres/postgres.git synced 2026-01-27 21:43:08 +03:00

Add sequence synchronization for logical replication.

This patch introduces sequence synchronization. Sequences that are synced
will have 2 states:
   - INIT (needs [re]synchronizing)
   - READY (is already synchronized)

A new sequencesync worker is launched as needed to synchronize sequences.
A single sequencesync worker is responsible for synchronizing all
sequences. It begins by retrieving the list of sequences that are flagged
for synchronization, i.e., those in the INIT state. These sequences are
then processed in batches, allowing multiple entries to be synchronized
within a single transaction. The worker fetches the current sequence
values and page LSNs from the remote publisher, updates the corresponding
sequences on the local subscriber, and finally marks each sequence as
READY upon successful synchronization.

Sequence synchronization occurs in 3 places:
1) CREATE SUBSCRIPTION
    - The command syntax remains unchanged.
    - The subscriber retrieves sequences associated with publications.
    - Published sequences are added to pg_subscription_rel with INIT
      state.
    - Initiate the sequencesync worker to synchronize all sequences.

2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION
    - The command syntax remains unchanged.
    - Dropped published sequences are removed from pg_subscription_rel.
    - Newly published sequences are added to pg_subscription_rel with INIT
      state.
    - Initiate the sequencesync worker to synchronize only newly added
      sequences.

3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES
    - A new command introduced for PG19 by f0b3573c3a.
    - All sequences in pg_subscription_rel are reset to INIT state.
    - Initiate the sequencesync worker to synchronize all sequences.
    - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command,
      addition and removal of missing sequences will not be done in this
      case.

Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-11-05 05:54:25 +00:00
parent 1fd981f053
commit 5509055d69
19 changed files with 1229 additions and 126 deletions

View File

@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202510281
#define CATALOG_VERSION_NO 202511051
#endif

View File

@@ -3433,7 +3433,7 @@
proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
prorettype => 'int8', proargtypes => 'regclass',
prosrc => 'pg_sequence_last_value' },
{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump',
{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization',
proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u',
prorettype => 'record', proargtypes => 'regclass',
proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}',

View File

@@ -82,6 +82,29 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
/*
* Holds local sequence identity and corresponding publisher values used during
* sequence synchronization.
*/
typedef struct LogicalRepSequenceInfo
{
/* Sequence information retrieved from the local node */
char *seqname;
char *nspname;
Oid localrelid;
/* Sequence information retrieved from the publisher node */
XLogRecPtr page_lsn;
int64 last_value;
bool is_called;
/*
* True if the sequence identified by nspname + seqname exists on the
* publisher.
*/
bool found_on_pub;
} LogicalRepSequenceInfo;
extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool retain_lock);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,

View File

@@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
extern void SetSequence(Oid relid, int64 next, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *record);

View File

@@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
extern void TablesyncWorkerMain(Datum main_arg);
extern void TableSyncWorkerMain(Datum main_arg);
extern void SequenceSyncWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);

View File

@@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType
{
WORKERTYPE_UNKNOWN = 0,
WORKERTYPE_TABLESYNC,
WORKERTYPE_SEQUENCESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
} LogicalRepWorkerType;
@@ -106,6 +107,8 @@ typedef struct LogicalRepWorker
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
TimestampTz last_seqsync_start_time;
} LogicalRepWorker;
/*
@@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern void logicalrep_reset_seqsync_start_time(void);
extern int logicalrep_sync_worker_count(Oid subid);
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
@@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
extern void ProcessSequencesForSync(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
Oid relid, TimestampTz *last_start_time);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
extern bool FetchRelationStates(bool *started_tx);
extern void FetchRelationStates(bool *has_pending_subtables,
bool *has_pending_sequences, bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
@@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
#define isTableSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
#define isSequenceSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_SEQUENCESYNC)
static inline bool
am_tablesync_worker(void)
{
return isTablesyncWorker(MyLogicalRepWorker);
return isTableSyncWorker(MyLogicalRepWorker);
}
static inline bool
am_sequencesync_worker(void)
{
return isSequenceSyncWorker(MyLogicalRepWorker);
}
static inline bool