From 3e8e05596a020f043f1efd6406e4511ea85170bd Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 28 Oct 2025 05:47:50 +0000 Subject: [PATCH] Add worker type argument to logical replication worker functions. Extend logicalrep_worker_stop, logicalrep_worker_wakeup, and logicalrep_worker_find to accept a worker type argument. This change enables differentiation between logical replication worker types, such as apply workers and table sync workers. While preserving existing behavior, it lays the groundwork for upcoming patch to add sequence synchronization workers. Author: Vignesh C Reviewed-by: shveta malik Reviewed-by: Peter Smith Reviewed-by: Chao Li Reviewed-by: Hayato Kuroda Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/commands/subscriptioncmds.c | 4 +-- src/backend/replication/logical/launcher.c | 40 ++++++++++++++------- src/backend/replication/logical/syncutils.c | 3 +- src/backend/replication/logical/tablesync.c | 11 +++--- src/backend/replication/logical/worker.c | 8 +++-- src/include/replication/worker_internal.h | 9 +++-- 6 files changed, 49 insertions(+), 26 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a0974d71de1..1f45444b499 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1082,7 +1082,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels = lappend(sub_remove_rels, remove_rel); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid); /* * For READY state, we would have already dropped the @@ -2134,7 +2134,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->type, w->subid, w->relid); } list_free(subworkers); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..95b5cae9a55 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -245,20 +245,25 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, } /* - * Walks the workers array and searches for one that matches given - * subscription id and relid. + * Walks the workers array and searches for one that matches given worker type, + * subscription id, and relation id. * - * We are only interested in the leader apply worker or table sync worker. + * For apply workers, the relid should be set to InvalidOid, as they manage + * changes across all tables. For table sync workers, the relid should be set + * to the OID of the relation being synchronized. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, + bool only_running) { int i; LogicalRepWorker *res = NULL; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for an attached worker that matches the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -268,7 +273,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) continue; if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -627,16 +632,20 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) } /* - * Stop the logical replication worker for subid/relid, if any. + * Stop the logical replication worker that matches the specified worker type, + * subscription id, and relation id. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid) { LogicalRepWorker *worker; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(wtype, subid, relid, false); if (worker) { @@ -694,16 +703,20 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) } /* - * Wake up (using latch) any logical replication worker for specified sub/rel. + * Wake up (using latch) any logical replication worker that matches the + * specified worker type, subscription id, and relation id. */ void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid) { LogicalRepWorker *worker; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(wtype, subid, relid, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -1260,7 +1273,8 @@ ApplyLauncherMain(Datum main_arg) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid, + false); if (w != NULL) { diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index e452a1e78d4..ae8c9385916 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -69,7 +69,8 @@ FinishSyncWorker(void) CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); /* Stop gracefully */ proc_exit(0); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 40e1ed3c20e..58c98488d7b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -160,7 +160,8 @@ wait_for_table_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, + worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -207,8 +208,9 @@ wait_for_worker_state_change(char expected_state) * waiting. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); @@ -476,7 +478,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5df5a4612b6..7edd1c9cf06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1817,7 +1817,8 @@ apply_handle_stream_start(StringInfo s) * Signal the leader apply worker, as it may be waiting for * us. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid); } parallel_stream_nchanges = 0; @@ -3284,8 +3285,9 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, * maybe_advance_nonremovable_xid() for details). */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - leader = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + leader = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + false); if (!leader) { ereport(ERROR, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index ae352f6e691..e23fa9a4514 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -254,7 +254,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, +extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, + Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); @@ -263,9 +264,11 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, + Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); -extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, + Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid);