1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Reset the logical worker type while cleaning up other worker info.

Commit 2a8b40e36 introduces the worker type field for logical replication
workers, but forgot to reset the type when the worker exits. This can lead
to recognizing a stopped worker as a valid logical replication worker.

Fix it by resetting the worker type and additionally adding the safeguard
to not use LogicalRepWorker until ->in_use is verified.

Reported-by: Thomas Munro based on cfbot reports.
Author: Hou Zhijie, Alvaro Herrera
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/CA+hUKGK2RQh4LifVgBmkHsCYChP-65UwGXOmnCzYVa5aAt4GWg@mail.gmail.com
This commit is contained in:
Amit Kapila
2023-08-25 08:57:55 +05:30
parent 252dcb3239
commit 9c13b6814a
2 changed files with 9 additions and 4 deletions

View File

@@ -793,6 +793,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
{ {
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
worker->type = WORKERTYPE_UNKNOWN;
worker->in_use = false; worker->in_use = false;
worker->proc = NULL; worker->proc = NULL;
worker->dbid = InvalidOid; worker->dbid = InvalidOid;
@@ -862,7 +863,7 @@ logicalrep_sync_worker_count(Oid subid)
{ {
LogicalRepWorker *w = &LogicalRepCtx->workers[i]; LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && isTablesyncWorker(w)) if (isTablesyncWorker(w) && w->subid == subid)
res++; res++;
} }
@@ -889,7 +890,7 @@ logicalrep_pa_worker_count(Oid subid)
{ {
LogicalRepWorker *w = &LogicalRepCtx->workers[i]; LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && isParallelApplyWorker(w)) if (isParallelApplyWorker(w) && w->subid == subid)
res++; res++;
} }

View File

@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn); XLogRecPtr remote_lsn);
#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \
#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC) (worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
static inline bool static inline bool
am_tablesync_worker(void) am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
static inline bool static inline bool
am_leader_apply_worker(void) am_leader_apply_worker(void)
{ {
Assert(MyLogicalRepWorker->in_use);
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
} }
static inline bool static inline bool
am_parallel_apply_worker(void) am_parallel_apply_worker(void)
{ {
Assert(MyLogicalRepWorker->in_use);
return isParallelApplyWorker(MyLogicalRepWorker); return isParallelApplyWorker(MyLogicalRepWorker);
} }