diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a0974d71de1..1f45444b499 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1082,7 +1082,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels = lappend(sub_remove_rels, remove_rel); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid); /* * For READY state, we would have already dropped the @@ -2134,7 +2134,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->type, w->subid, w->relid); } list_free(subworkers); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..95b5cae9a55 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -245,20 +245,25 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, } /* - * Walks the workers array and searches for one that matches given - * subscription id and relid. + * Walks the workers array and searches for one that matches given worker type, + * subscription id, and relation id. * - * We are only interested in the leader apply worker or table sync worker. + * For apply workers, the relid should be set to InvalidOid, as they manage + * changes across all tables. For table sync workers, the relid should be set + * to the OID of the relation being synchronized. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, + bool only_running) { int i; LogicalRepWorker *res = NULL; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for an attached worker that matches the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -268,7 +273,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) continue; if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -627,16 +632,20 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) } /* - * Stop the logical replication worker for subid/relid, if any. + * Stop the logical replication worker that matches the specified worker type, + * subscription id, and relation id. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid) { LogicalRepWorker *worker; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(wtype, subid, relid, false); if (worker) { @@ -694,16 +703,20 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) } /* - * Wake up (using latch) any logical replication worker for specified sub/rel. + * Wake up (using latch) any logical replication worker that matches the + * specified worker type, subscription id, and relation id. */ void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid) { LogicalRepWorker *worker; + /* relid must be valid only for table sync workers */ + Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(wtype, subid, relid, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -1260,7 +1273,8 @@ ApplyLauncherMain(Datum main_arg) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid, + false); if (w != NULL) { diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index e452a1e78d4..ae8c9385916 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -69,7 +69,8 @@ FinishSyncWorker(void) CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); /* Stop gracefully */ proc_exit(0); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 40e1ed3c20e..58c98488d7b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -160,7 +160,8 @@ wait_for_table_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, + worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -207,8 +208,9 @@ wait_for_worker_state_change(char expected_state) * waiting. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); @@ -476,7 +478,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5df5a4612b6..7edd1c9cf06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1817,7 +1817,8 @@ apply_handle_stream_start(StringInfo s) * Signal the leader apply worker, as it may be waiting for * us. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + logicalrep_worker_wakeup(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid); } parallel_stream_nchanges = 0; @@ -3284,8 +3285,9 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, * maybe_advance_nonremovable_xid() for details). */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - leader = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + leader = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + false); if (!leader) { ereport(ERROR, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index ae352f6e691..e23fa9a4514 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -254,7 +254,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, +extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, + Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); @@ -263,9 +264,11 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, + Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); -extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, + Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid);