diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index be402613936..63a818140bd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -285,6 +285,21 @@ InitWalSender(void) MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + /* + * If the client didn't specify a database to connect to, show in PGPROC + * that our advertised xmin should affect vacuum horizons in all + * databases. This allows physical replication clients to send hot + * standby feedback that will delay vacuum cleanup in all databases. + */ + if (MyDatabaseId == InvalidOid) + { + Assert(MyProc->xmin == InvalidTransactionId); + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags |= PROC_AFFECTS_ALL_HORIZONS; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); + } + /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index f6e98aae298..25c310f6757 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1810,21 +1810,28 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) TransactionIdOlder(h->shared_oldest_nonremovable, xmin); /* - * Normally queries in other databases are ignored for anything but - * the shared horizon. But in recovery we cannot compute an accurate - * per-database horizon as all xids are managed via the - * KnownAssignedXids machinery. + * Normally sessions in other databases are ignored for anything but + * the shared horizon. * - * Be careful to compute a pessimistic value when MyDatabaseId is not - * set. If this is a backend in the process of starting up, we may not - * use a "too aggressive" horizon (otherwise we could end up using it - * to prune still needed data away). If the current backend never - * connects to a database that is harmless, because - * data_oldest_nonremovable will never be utilized. + * However, include them when MyDatabaseId is not (yet) set. A + * backend in the process of starting up must not compute a "too + * aggressive" horizon, otherwise we could end up using it to prune + * still-needed data away. If the current backend never connects to a + * database this is harmless, because data_oldest_nonremovable will + * never be utilized. + * + * Also, sessions marked with PROC_AFFECTS_ALL_HORIZONS should always + * be included. (This flag is used for hot standby feedback, which + * can't be tied to a specific database.) + * + * Also, while in recovery we cannot compute an accurate per-database + * horizon, as all xids are managed via the KnownAssignedXids + * machinery. */ - if (in_recovery || - MyDatabaseId == InvalidOid || proc->databaseId == MyDatabaseId || - proc->databaseId == 0) /* always include WalSender */ + if (proc->databaseId == MyDatabaseId || + MyDatabaseId == InvalidOid || + (statusFlags & PROC_AFFECTS_ALL_HORIZONS) || + in_recovery) { /* * We can ignore this backend if it's running CREATE INDEX @@ -2681,10 +2688,14 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc) /* Install xmin */ MyProc->xmin = TransactionXmin = xmin; - /* Flags being copied must be valid copy-able flags. */ - Assert((proc->statusFlags & (~PROC_COPYABLE_FLAGS)) == 0); - MyProc->statusFlags = proc->statusFlags; - ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + /* walsender cheats by passing proc == MyProc, don't check its flags */ + if (proc != MyProc) + { + /* Flags being copied must be valid copy-able flags. */ + Assert((proc->statusFlags & (~PROC_COPYABLE_FLAGS)) == 0); + MyProc->statusFlags = proc->statusFlags; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + } result = true; } diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 809b74db5e6..15be21c00a6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -60,6 +60,9 @@ struct XidCache #define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical * decoding outside xact */ +#define PROC_AFFECTS_ALL_HORIZONS 0x20 /* this proc's xmin must be + * included in vacuum horizons + * in all databases */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \