diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index b47d8b4106e..742deb037b7 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1376,6 +1376,60 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' + + Read-Your-Writes Consistency + + + In asynchronous replication, there is always a short window where changes + on the primary may not yet be visible on the standby due to replication + lag. This can lead to inconsistencies when an application writes data on + the primary and then immediately issues a read query on the standby. + However, it is possible to address this without switching to synchronous + replication. + + + + To address this, PostgreSQL offers a mechanism for read-your-writes + consistency. The key idea is to ensure that a client sees its own writes + by synchronizing the WAL replay on the standby with the known point of + change on the primary. + + + + This is achieved by the following steps. After performing write + operations, the application retrieves the current WAL location using a + function call like this. + + +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + + + + + The LSN obtained from the primary is then communicated + to the standby server. This can be managed at the application level or + via the connection pooler. On the standby, the application issues the + command to block further processing until + the standby's WAL replay process reaches (or exceeds) the specified + LSN. + + +postgres=# WAIT FOR LSN '0/306EE20'; + status +-------- + success +(1 row) + + Once the command returns a status of success, it guarantees that all + changes up to the provided LSN have been applied, + ensuring that subsequent read queries will reflect the latest updates. + + + Continuous Archiving in Standby diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index f5be638867a..e167406c744 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory. + diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml new file mode 100644 index 00000000000..3b8e842d1de --- /dev/null +++ b/doc/src/sgml/ref/wait_for.sgml @@ -0,0 +1,234 @@ + + + + + WAIT FOR + + + + WAIT FOR + 7 + SQL - Language Statements + + + + WAIT FOR + wait for target LSN to be replayed, optionally with a timeout + + + + +WAIT FOR LSN 'lsn' [ WITH ( option [, ...] ) ] + +where option can be: + + TIMEOUT 'timeout' + NO_THROW + + + + + Description + + + Waits until recovery replays lsn. + If no timeout is specified or it is set to + zero, this command waits indefinitely for the + lsn. + On timeout, or if the server is promoted before + lsn is reached, an error is emitted, + unless NO_THROW is specified in the WITH clause. + If NO_THROW is specified, then the command + doesn't throw errors. + + + + The possible return values are success, + timeout, and not in recovery. + + + + + Parameters + + + + lsn + + + Specifies the target LSN to wait for. + + + + + + WITH ( option [, ...] ) + + + This clause specifies optional parameters for the wait operation. + The following parameters are supported: + + + + TIMEOUT 'timeout' + + + When specified and timeout is greater than zero, + the command waits until lsn is reached or + the specified timeout has elapsed. + + + The timeout might be given as integer number of + milliseconds. Also it might be given as string literal with + integer number of milliseconds or a number with unit + (see ). + + + + + + NO_THROW + + + Specify to not throw an error in the case of timeout or + running on the primary. In this case the result status can be get from + the return value. + + + + + + + + + + + + Outputs + + + + success + + + This return value denotes that we have successfully reached + the target lsn. + + + + + + timeout + + + This return value denotes that the timeout happened before reaching + the target lsn. + + + + + + not in recovery + + + This return value denotes that the database server is not in a recovery + state. This might mean either the database server was not in recovery + at the moment of receiving the command, or it was promoted before + reaching the target lsn. + + + + + + + + Notes + + + WAIT FOR command waits till + lsn to be replayed on standby. + That is, after this command execution, the value returned by + pg_last_wal_replay_lsn should be greater or equal + to the lsn value. This is useful to achieve + read-your-writes-consistency, while using async replica for reads and + primary for writes. In that case, the lsn of the last + modification should be stored on the client application side or the + connection pooler side. + + + + WAIT FOR command should be called on standby. + If a user runs WAIT FOR on primary, it + will error out unless NO_THROW is specified in the WITH clause. + However, if WAIT FOR is + called on primary promoted from standby and lsn + was already replayed, then the WAIT FOR command just + exits immediately. + + + + + + Examples + + + You can use WAIT FOR command to wait for + the pg_lsn value. For example, an application could update + the movie table and get the lsn after + changes just made. This example uses pg_current_wal_insert_lsn + on primary server to get the lsn given that + synchronous_commit could be set to + off. + + +postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; +UPDATE 100 +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + + + Then an application could run WAIT FOR + with the lsn obtained from primary. After that the + changes made on primary should be guaranteed to be visible on replica. + + +postgres=# WAIT FOR LSN '0/306EE20'; + status +-------- + success +(1 row) +postgres=# SELECT * FROM movie WHERE genre = 'Drama'; + genre +------- +(0 rows) + + + + + If the target LSN is not reached before the timeout, the error is thrown. + + +postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s'); +ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 + + + + + The same example uses WAIT FOR with + NO_THROW option. + +postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW); + status +-------- + timeout +(1 row) + + + + diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index ff85ace83fc..2cf02c37b17 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -216,6 +216,7 @@ &update; &vacuum; &values; + &waitFor; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2cf3d4e92b7..092e197eba3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2843,6 +2844,11 @@ AbortTransaction(void) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9900e3e0179..7c959051e11 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,6 +62,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -6227,6 +6228,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before reaching the target LSN. + */ + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 93c50831b26..550de6e4a59 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,6 +40,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1838,6 +1839,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cb2fbdc7c60..f99acfd2b4b 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -64,6 +64,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index dd4cde41d32..9f640ad4810 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -53,4 +53,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..67068a92dbf --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,212 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR, which allows waiting for events such as + * time passing or LSN having been replayed on replica. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "commands/defrem.h" +#include "commands/wait.h" +#include "executor/executor.h" +#include "parser/parse_node.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +void +ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) +{ + XLogRecPtr lsn; + int64 timeout = 0; + WaitLSNResult waitLSNResult; + bool throw = true; + TupleDesc tupdesc; + TupOutputState *tstate; + const char *result = ""; + bool timeout_specified = false; + bool no_throw_specified = false; + + /* Parse and validate the mandatory LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn_literal))); + + foreach_node(DefElem, defel, stmt->options) + { + if (strcmp(defel->defname, "timeout") == 0) + { + char *timeout_str; + const char *hintmsg; + double result; + + if (timeout_specified) + errorConflictingDefElem(defel, pstate); + timeout_specified = true; + + timeout_str = defGetString(defel); + + if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg)) + { + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeout value: \"%s\"", timeout_str), + hintmsg ? errhint("%s", _(hintmsg)) : 0); + } + + /* + * Get rid of any fractional part in the input. This is so we + * don't fail on just-out-of-range values that would round into + * range. + */ + result = rint(result); + + /* Range check */ + if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result))) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("timeout value is out of range")); + + if (result < 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("timeout cannot be negative")); + + timeout = (int64) result; + } + else if (strcmp(defel->defname, "no_throw") == 0) + { + if (no_throw_specified) + errorConflictingDefElem(defel, pstate); + + no_throw_specified = true; + + throw = !defGetBoolean(defel); + } + else + { + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location)); + } + } + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why WAIT FOR is a + * command, not a procedure or function. + * + * At first, we should check there is no active snapshot. According to + * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is + * processed with a snapshot. Thankfully, we can pop this snapshot, + * because PortalRunUtility() can tolerate this. + */ + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + + /* + * At second, invalidate a catalog snapshot if any. And we should be done + * with the preparation. + */ + InvalidateCatalogSnapshot(); + + /* Give up if there is still an active or registered snapshot. */ + if (HaveRegisteredOrActiveSnapshot()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAIT FOR must be only called without an active or registered snapshot"), + errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED.")); + + /* + * As the result we should hold no snapshot, and correspondingly our xmin + * should be unset. + */ + Assert(MyProc->xmin == InvalidTransactionId); + + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (waitLSNResult) + { + case WAIT_LSN_RESULT_SUCCESS: + /* Nothing to do on success */ + result = "success"; + break; + + case WAIT_LSN_RESULT_TIMEOUT: + if (throw) + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + else + result = "timeout"; + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + if (throw) + { + if (PromoteIsTriggered()) + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + } + else + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the replay LSN can only be executed during recovery.")); + } + else + result = "not in recovery"; + break; + } + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = WaitStmtResultDesc(stmt); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* Send it */ + do_text_output_oneline(tstate, result); + + end_tup_output(tstate); +} + +TupleDesc +WaitStmtResultDesc(WaitStmt *stmt) +{ + TupleDesc tupdesc; + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + return tupdesc; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 8a0470d5b84..57fe0186547 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt UnlistenStmt UpdateStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt - ViewStmt CheckPointStmt CreateConversionStmt + ViewStmt WaitStmt CheckPointStmt CreateConversionStmt DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt @@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type opt_concurrently %type opt_drop_behavior %type opt_utility_option_list +%type opt_wait_with_clause %type utility_option_list %type utility_option_elem %type utility_option_name @@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); json_object_constructor_null_clause_opt json_array_constructor_null_clause_opt - /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -1120,6 +1120,7 @@ stmt: | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -16482,6 +16483,26 @@ xml_passing_mech: | BY VALUE_P ; +/***************************************************************************** + * + * WAIT FOR LSN + * + *****************************************************************************/ + +WaitStmt: + WAIT FOR LSN_P Sconst opt_wait_with_clause + { + WaitStmt *n = makeNode(WaitStmt); + n->lsn_literal = $4; + n->options = $5; + $$ = (Node *) n; + } + ; + +opt_wait_with_clause: + WITH '(' utility_option_list ')' { $$ = $3; } + | /*EMPTY*/ { $$ = NIL; } + ; /* * Aggregate decoration clauses @@ -17969,6 +17990,7 @@ unreserved_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18139,6 +18161,7 @@ unreserved_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHITESPACE_P | WITHIN | WITHOUT @@ -18585,6 +18608,7 @@ bare_label_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18796,6 +18820,7 @@ bare_label_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHEN | WHITESPACE_P | WORK diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 96f29aafc39..26b201eadb8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -947,6 +948,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 74179139fa9..fde78c55160 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, MemoryContextSwitchTo(portal->portalContext); /* - * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from - * under us, so don't complain if it's now empty. Otherwise, our snapshot - * should be the top one; pop it. Note that this could be a different - * snapshot from the one we made above; see EnsurePortalSnapshotExists. + * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot + * stack from under us, so don't complain if it's now empty. Otherwise, + * our snapshot should be the top one; pop it. Note that this could be a + * different snapshot from the one we made above; see + * EnsurePortalSnapshotExists. */ if (portal->portalSnapshot != NULL && ActiveSnapshotSet()) { @@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt) IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || - IsA(utilityStmt, CheckPointStmt)) + IsA(utilityStmt, CheckPointStmt) || + IsA(utilityStmt, WaitStmt)) return false; return true; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 918db53dd5e..082967c0a86 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -56,6 +56,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/wait.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree) case T_PrepareStmt: case T_UnlistenStmt: case T_VariableSetStmt: + case T_WaitStmt: { /* * These modify only backend-local state, so they're OK to run @@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_WaitStmt: + { + ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree) case T_VariableShowStmt: return true; + case T_WaitStmt: + return true; + default: return false; } @@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree) return GetPGVariableResultDesc(n->name); } + case T_WaitStmt: + return WaitStmtResultDesc((WaitStmt *) parsetree); + default: return NULL; } @@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree) } break; + case T_WaitStmt: + tag = CMDTAG_WAIT; + break; + /* already-planned queries */ case T_PlannedStmt: { @@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_DDL; break; + case T_WaitStmt: + lev = LOGSTMT_ALL; + break; + /* already-planned queries */ case T_PlannedStmt: { diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 00000000000..ce332134fb3 --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H + +#include "nodes/parsenodes.h" +#include "parser/parse_node.h" +#include "tcop/dest.h" + +extern void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest); +extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt); + +#endif /* WAIT_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index ecbddd12e1b..d14294a4ece 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4385,4 +4385,12 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +typedef struct WaitStmt +{ + NodeTag type; + char *lsn_literal; /* LSN string from grammar */ + List *options; /* List of DefElem nodes */ +} WaitStmt; + + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 84182eaaae2..5d4fe27ef96 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -270,6 +270,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("lsn", LSN_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL) @@ -496,6 +497,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index d250a714d59..c4606d65043 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false) PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false) PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true) PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false) +PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 52993c32dbb..523a5cd5b52 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -56,7 +56,8 @@ tests += { 't/045_archive_restartpoint.pl', 't/046_checkpoint_logical_slot.pl', 't/047_checkpoint_physical_slot.pl', - 't/048_vacuum_horizon_floor.pl' + 't/048_vacuum_horizon_floor.pl', + 't/049_wait_for_lsn.pl', ], }, } diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl new file mode 100644 index 00000000000..e0ddb06a2f0 --- /dev/null +++ b/src/test/recovery/t/049_wait_for_lsn.pl @@ -0,0 +1,302 @@ +# Checks waiting for the LSN replay on standby using +# the WAIT FOR command. +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Create a streaming standby with a 1 second delay from the backup +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf( + 'postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + +# 1. Make sure that WAIT FOR works: add new content to +# primary and memorize primary's insert LSN, then wait for that LSN to be +# replayed on standby. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn1}' WITH (timeout '1d'); + SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); +]); + +# Make sure the current LSN on standby is at least as big as the LSN we +# observed on primary's before. +ok((split("\n", $output))[-1] >= 0, + "standby reached the same LSN as primary after WAIT FOR"); + +# 2. Check that new data is visible after calling WAIT FOR +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}'; + SELECT count(*) FROM wait_test; +]); + +# Make sure the count(*) on standby reflects the recent changes on primary +ok((split("\n", $output))[-1] eq 30, + "standby reached the same LSN as primary"); + +# 3. Check that waiting for unreachable LSN triggers the timeout. The +# unreachable LSN must be well in advance. So WAL records issued by +# the concurrent autovacuum could not affect that. +my $lsn3 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $stderr; +$node_standby->safe_psql('postgres', + "WAIT FOR LSN '${lsn2}' WITH (timeout '10ms');"); +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${lsn3}' WITH (timeout '1000ms');", + stderr => \$stderr); +ok( $stderr =~ /timed out while waiting for target LSN/, + "get timeout on waiting for unreachable LSN"); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]); +ok($output eq "success", + "WAIT FOR returns correct status after successful waiting"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]); +ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); + +# 4. Check that WAIT FOR triggers an error if called on primary, +# within another function, or inside a transaction with an isolation level +# higher than READ COMMITTED. + +$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ /recovery is not in progress/, + "get an error when running on the primary"); + +$node_standby->psql( + 'postgres', + "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running in a transaction with an isolation level higher than REPEATABLE READ" +); + +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$ + BEGIN + EXECUTE format('WAIT FOR LSN %L;', target_lsn); + END +\$\$ +LANGUAGE plpgsql; +]); + +$node_primary->wait_for_catchup($node_standby); +$node_standby->psql( + 'postgres', + "SELECT pg_wal_replay_wait_wrap('${lsn3}');", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running within another function"); + +# 5. Check parameter validation error cases on standby before promotion +my $test_lsn = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); + +# Test negative timeout +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout '-1000ms');", + stderr => \$stderr); +ok($stderr =~ /timeout cannot be negative/, "get error for negative timeout"); + +# Test unknown parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (unknown_param 'value');", + stderr => \$stderr); +ok($stderr =~ /option "unknown_param" not recognized/, + "get error for unknown parameter"); + +# Test duplicate TIMEOUT parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout '1000', timeout '2000');", + stderr => \$stderr); +ok( $stderr =~ /conflicting or redundant options/, + "get error for duplicate TIMEOUT parameter"); + +# Test duplicate NO_THROW parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (no_throw, no_throw);", + stderr => \$stderr); +ok( $stderr =~ /conflicting or redundant options/, + "get error for duplicate NO_THROW parameter"); + +# Test syntax error - options without WITH keyword +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' (timeout '100ms');", + stderr => \$stderr); +ok($stderr =~ /syntax error/, + "get syntax error when options specified without WITH keyword"); + +# Test syntax error - missing LSN +$node_standby->psql('postgres', "WAIT FOR TIMEOUT 1000;", stderr => \$stderr); +ok($stderr =~ /syntax error/, "get syntax error for missing LSN"); + +# Test invalid LSN format +$node_standby->psql( + 'postgres', + "WAIT FOR LSN 'invalid_lsn';", + stderr => \$stderr); +ok($stderr =~ /invalid input syntax for type pg_lsn/, + "get error for invalid LSN format"); + +# Test invalid timeout format +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout 'invalid');", + stderr => \$stderr); +ok($stderr =~ /invalid timeout value/, + "get error for invalid timeout format"); + +# Test new WITH clause syntax +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]); +ok($output eq "success", "WAIT FOR WITH clause syntax works correctly"); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn3}' WITH (timeout 100, no_throw);]); +ok($output eq "timeout", + "WAIT FOR WITH clause returns correct timeout status"); + +# Test WITH clause error case - invalid option +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (invalid_option 'value');", + stderr => \$stderr); +ok( $stderr =~ /option "invalid_option" not recognized/, + "get error for invalid WITH clause option"); + +# 6. Check the scenario of multiple LSN waiters. We make 5 background +# psql sessions each waiting for a corresponding insertion. When waiting is +# finished, stored procedures logs if there are visible as many rows as +# should be. +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION log_count(i int) RETURNS void AS \$\$ + DECLARE + count int; + BEGIN + SELECT count(*) FROM wait_test INTO count; + IF count >= 31 + i THEN + RAISE LOG 'count %', i; + END IF; + END +\$\$ +LANGUAGE plpgsql; +]); +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); +my @psql_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (${i});"); + my $lsn = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); + $psql_sessions[$i] = $node_standby->background_psql('postgres'); + $psql_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn}'; + SELECT log_count(${i}); + ]); +} +my $log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("count ${i}", $log_offset); + $psql_sessions[$i]->quit; +} + +ok(1, 'multiple LSN waiters reported consistent data'); + +# 7. Check that the standby promotion terminates the wait on LSN. Start +# waiting for an unreachable LSN then promote. Check the log for the relevant +# error message. Also, check that waiting for already replayed LSN doesn't +# cause an error even after promotion. +my $lsn4 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $lsn5 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $psql_session = $node_standby->background_psql('postgres'); +$psql_session->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}'; +]); + +# Make sure standby will be promoted at least at the primary insert LSN we +# have just observed. Use pg_switch_wal() to force the insert LSN to be +# written then wait for standby to catchup. +$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$node_primary->wait_for_catchup($node_standby); + +$log_offset = -s $node_standby->logfile; +$node_standby->promote; +$node_standby->wait_for_log('recovery is not in progress', $log_offset); + +ok(1, 'got error after standby promote'); + +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); + +ok(1, 'wait for already replayed LSN exits immediately even after promotion'); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]); +ok($output eq "not in recovery", + "WAIT FOR returns correct status after standby promotion"); + + +$node_standby->stop; +$node_primary->stop; + +# If we send \q with $psql_session->quit the command can be sent to the session +# already closed. So \q is in initial script, here we only finish IPC::Run. +$psql_session->{run}->finish; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 73df31344be..432509277c9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3272,6 +3272,7 @@ WaitLSNState WaitLSNProcInfo WaitLSNResult WaitPMResult +WaitStmt WalCloseMethod WalCompression WalInsertClass