1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-21 05:21:08 +03:00

Implement pg_wal_replay_wait() stored procedure

pg_wal_replay_wait() is to be used on standby and specifies waiting for
the specific WAL location to be replayed before starting the transaction.
This option is useful when the user makes some data changes on primary and
needs a guarantee to see these changes on standby.

The queue of waiters is stored in the shared memory array sorted by LSN.
During replay of WAL waiters whose LSNs are already replayed are deleted from
the shared memory array and woken up by setting of their latches.

pg_wal_replay_wait() needs to wait without any snapshot held.  Otherwise,
the snapshot could prevent the replay of WAL records implying a kind of
self-deadlock.  This is why it is only possible to implement
pg_wal_replay_wait() as a procedure working in a non-atomic context,
not a function.

Catversion is bumped.

Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira
This commit is contained in:
Alexander Korotkov
2024-04-02 22:48:03 +03:00
parent 6faca9ae28
commit 06c418e163
16 changed files with 648 additions and 2 deletions

View File

@@ -66,6 +66,7 @@
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
#include "commands/waitlsn.h"
#include "common/controldata_utils.h"
#include "common/file_utils.h"
#include "executor/instrument.h"
@@ -6040,6 +6041,12 @@ StartupXLOG(void)
UpdateControlFile();
LWLockRelease(ControlFileLock);
/*
* Wake up all waiters for replay LSN. They need to report an error that
* recovery was ended before achieving the target LSN.
*/
WaitLSNSetLatches(InvalidXLogRecPtr);
/*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())

View File

@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
#include "commands/waitlsn.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1828,6 +1829,16 @@ PerformWalRecovery(void)
break;
}
/*
* If we replayed an LSN that someone was waiting for then walk
* over the shared memory array and set latches to notify the
* waiters.
*/
if (waitLSN &&
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
pg_atomic_read_u64(&waitLSN->minLSN)))
WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);

View File

@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
LANGUAGE internal AS 'pg_wal_replay_wait';
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
OUT lsn pg_lsn, OUT xid xid, OUT data text)

View File

@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
view.o
view.o \
waitlsn.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -50,4 +50,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
'waitlsn.c',
)

View File

@@ -0,0 +1,348 @@
/*-------------------------------------------------------------------------
*
* waitlsn.c
* Implements waiting for the given LSN, which is used in
* CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/commands/waitlsn.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <float.h>
#include <math.h>
#include "pgstat.h"
#include "fmgr.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_type.h"
#include "commands/waitlsn.h"
#include "executor/spi.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
#include "utils/fmgrprotos.h"
/* Add to / delete from shared memory array */
static void addLSNWaiter(XLogRecPtr lsn);
static void deleteLSNWaiter(void);
struct WaitLSNState *waitLSN = NULL;
static volatile sig_atomic_t haveShmemItem = false;
/*
* Report the amount of shared memory space needed for WaitLSNState
*/
Size
WaitLSNShmemSize(void)
{
Size size;
size = offsetof(WaitLSNState, procInfos);
size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
return size;
}
/* Initialize the WaitLSNState in the shared memory */
void
WaitLSNShmemInit(void)
{
bool found;
waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
WaitLSNShmemSize(),
&found);
if (!found)
{
SpinLockInit(&waitLSN->mutex);
waitLSN->numWaitedProcs = 0;
pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
}
}
/*
* Add the information about the LSN waiter backend to the shared memory
* array.
*/
static void
addLSNWaiter(XLogRecPtr lsn)
{
WaitLSNProcInfo cur;
int i;
SpinLockAcquire(&waitLSN->mutex);
cur.procnum = MyProcNumber;
cur.waitLSN = lsn;
for (i = 0; i < waitLSN->numWaitedProcs; i++)
{
if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
{
WaitLSNProcInfo tmp;
tmp = waitLSN->procInfos[i];
waitLSN->procInfos[i] = cur;
cur = tmp;
}
}
waitLSN->procInfos[i] = cur;
waitLSN->numWaitedProcs++;
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
SpinLockRelease(&waitLSN->mutex);
}
/*
* Delete the information about the LSN waiter backend from the shared memory
* array.
*/
static void
deleteLSNWaiter(void)
{
int i;
bool found = false;
SpinLockAcquire(&waitLSN->mutex);
for (i = 0; i < waitLSN->numWaitedProcs; i++)
{
if (waitLSN->procInfos[i].procnum == MyProcNumber)
found = true;
if (found && i < waitLSN->numWaitedProcs - 1)
{
waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
}
}
if (!found)
{
SpinLockRelease(&waitLSN->mutex);
return;
}
waitLSN->numWaitedProcs--;
if (waitLSN->numWaitedProcs != 0)
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
else
pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
SpinLockRelease(&waitLSN->mutex);
}
/*
* Set latches of LSN waiters whose LSN has been replayed. Set latches of all
* LSN waiters when InvalidXLogRecPtr is given.
*/
void
WaitLSNSetLatches(XLogRecPtr currentLSN)
{
int i;
int *wakeUpProcNums;
int numWakeUpProcs;
wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
SpinLockAcquire(&waitLSN->mutex);
/*
* Remember processes, whose waited LSNs are already replayed. We should
* set their latches later after spinlock release.
*/
for (i = 0; i < waitLSN->numWaitedProcs; i++)
{
if (!XLogRecPtrIsInvalid(currentLSN) &&
waitLSN->procInfos[i].waitLSN > currentLSN)
break;
wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
}
/*
* Immediately remove those processes from the shmem array. Otherwise,
* shmem array items will be here till corresponding processes wake up and
* delete themselves.
*/
numWakeUpProcs = i;
for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
waitLSN->numWaitedProcs -= numWakeUpProcs;
if (waitLSN->numWaitedProcs != 0)
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
else
pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
SpinLockRelease(&waitLSN->mutex);
/*
* Set latches for processes, whose waited LSNs are already replayed. This
* involves spinlocks. So, we shouldn't do this under a spinlock.
*/
for (i = 0; i < numWakeUpProcs; i++)
{
PGPROC *backend;
backend = GetPGProcByNumber(wakeUpProcNums[i]);
SetLatch(&backend->procLatch);
}
pfree(wakeUpProcNums);
}
/*
* Delete our item from shmem array if any.
*/
void
WaitLSNCleanup(void)
{
if (haveShmemItem)
deleteLSNWaiter();
}
/*
* Wait using MyLatch till the given LSN is replayed, the postmaster dies or
* timeout happens.
*/
void
WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
{
XLogRecPtr currentLSN;
TimestampTz endtime;
/* Shouldn't be called when shmem isn't initialized */
Assert(waitLSN);
/* Should be only called by a backend */
Assert(MyBackendType == B_BACKEND);
if (!RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("Waiting for LSN can only be executed during recovery.")));
/* If target LSN is already replayed, exit immediately */
if (targetLSN <= GetXLogReplayRecPtr(NULL))
return;
if (timeout > 0)
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
addLSNWaiter(targetLSN);
haveShmemItem = true;
for (;;)
{
int rc;
int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
long delay_ms = 0;
/* Check if the waited LSN has been replayed */
currentLSN = GetXLogReplayRecPtr(NULL);
if (targetLSN <= currentLSN)
break;
/* Recheck that recovery is still in-progress */
if (!RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
LSN_FORMAT_ARGS(targetLSN),
LSN_FORMAT_ARGS(currentLSN))));
if (timeout > 0)
{
delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
latch_events |= WL_TIMEOUT;
if (delay_ms <= 0)
break;
}
CHECK_FOR_INTERRUPTS();
rc = WaitLatch(MyLatch, latch_events, delay_ms,
WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
if (targetLSN > currentLSN)
{
deleteLSNWaiter();
haveShmemItem = false;
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
LSN_FORMAT_ARGS(targetLSN),
LSN_FORMAT_ARGS(currentLSN))));
}
else
{
haveShmemItem = false;
}
}
Datum
pg_wal_replay_wait(PG_FUNCTION_ARGS)
{
XLogRecPtr target_lsn = PG_GETARG_LSN(0);
int64 timeout = PG_GETARG_INT64(1);
CallContext *context = (CallContext *) fcinfo->context;
if (timeout < 0)
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("\"timeout\" must not be negative")));
/*
* We are going to wait for the LSN replay. We should first care that we
* don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
* Otherwise, our snapshot could prevent the replay of WAL records
* implying a kind of self-deadlock. This is the reason why
* pg_wal_replay_wait() is a procedure, not a function.
*
* At first, we check that pg_wal_replay_wait() is called in a non-atomic
* context. That is, a procedure call isn't wrapped into a transaction,
* another procedure call, or a function call.
*
* Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
* context, CallStmt is processed with a snapshot. Thankfully, we can pop
* this snapshot, because PortalRunUtility() can tolerate this.
*/
if (context->atomic)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));
if (ActiveSnapshotSet())
PopActiveSnapshot();
Assert(!ActiveSnapshotSet());
InvalidateCatalogSnapshot();
Assert(MyProc->xmin == InvalidTransactionId);
(void) WaitForLSN(target_lsn, timeout);
PG_RETURN_VOID();
}

View File

@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, WaitEventExtensionShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
size = add_size(size, WaitLSNShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
/* Initialize subsystems */
CreateOrAttachShmemStructs();
/*
* Init array of Latches in shared memory for wait lsn
*/
WaitLSNShmemInit();
#ifdef EXEC_BACKEND
/*

View File

@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
/*
* Cleanup waiting for LSN if any.
*/
WaitLSNCleanup();
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();

View File

@@ -79,6 +79,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."