diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml index 3b8e842d1de..df72b3327c8 100644 --- a/doc/src/sgml/ref/wait_for.sgml +++ b/doc/src/sgml/ref/wait_for.sgml @@ -16,17 +16,23 @@ PostgreSQL documentation WAIT FOR - wait for target LSN to be replayed, optionally with a timeout + wait for WAL to reach a target LSN -WAIT FOR LSN 'lsn' [ WITH ( option [, ...] ) ] +WAIT FOR LSN 'lsn' + [ WITH ( option [, ...] ) ] where option can be: + MODE 'mode' TIMEOUT 'timeout' NO_THROW + +and mode can be: + + standby_replay | standby_write | standby_flush | primary_flush @@ -34,20 +40,27 @@ WAIT FOR LSN 'lsn' [ WITH ( Description - Waits until recovery replays lsn. - If no timeout is specified or it is set to - zero, this command waits indefinitely for the - lsn. - On timeout, or if the server is promoted before - lsn is reached, an error is emitted, - unless NO_THROW is specified in the WITH clause. - If NO_THROW is specified, then the command - doesn't throw errors. + Waits until the specified lsn is reached + according to the specified mode, + which determines whether to wait for WAL to be written, flushed, or replayed. + If no timeout is specified or it is set to + zero, this command waits indefinitely for the + lsn. - The possible return values are success, - timeout, and not in recovery. + On timeout, an error is emitted unless NO_THROW + is specified in the WITH clause. For standby modes + (standby_replay, standby_write, + standby_flush), an error is also emitted if the + server is promoted before the lsn is reached. + If NO_THROW is specified, the command returns + a status string instead of throwing errors. + + + + The possible return values are success, + timeout, and not in recovery. @@ -72,6 +85,65 @@ WAIT FOR LSN 'lsn' [ WITH ( + + MODE 'mode' + + + Specifies the type of LSN processing to wait for. If not specified, + the default is standby_replay. The valid modes are: + + + + + standby_replay: Wait for the LSN to be replayed + (applied to the database) on a standby server. After successful + completion, pg_last_wal_replay_lsn() will + return a value greater than or equal to the target LSN. This mode + can only be used during recovery. + + + + + standby_write: Wait for the WAL containing the + LSN to be received from the primary and written to disk on a + standby server, but not yet flushed. This is faster than + standby_flush but provides weaker durability + guarantees since the data may still be in operating system + buffers. After successful completion, the + written_lsn column in + + pg_stat_wal_receiver will show + a value greater than or equal to the target LSN. This mode can + only be used during recovery. + + + + + standby_flush: Wait for the WAL containing the + LSN to be received from the primary and flushed to disk on a + standby server. This provides a durability guarantee without + waiting for the WAL to be applied. After successful completion, + pg_last_wal_receive_lsn() will return a + value greater than or equal to the target LSN. This value is + also available as the flushed_lsn + column in + pg_stat_wal_receiver. This mode + can only be used during recovery. + + + + + primary_flush: Wait for the WAL containing the + LSN to be flushed to disk on a primary server. After successful + completion, pg_current_wal_flush_lsn() will + return a value greater than or equal to the target LSN. This mode + can only be used on a primary server (not during recovery). + + + + + + TIMEOUT 'timeout' @@ -135,9 +207,12 @@ WAIT FOR LSN 'lsn' [ WITH ( This return value denotes that the database server is not in a recovery - state. This might mean either the database server was not in recovery - at the moment of receiving the command, or it was promoted before - reaching the target lsn. + state. This might mean either the database server was not in recovery + at the moment of receiving the command (i.e., executed on a primary), + or it was promoted before reaching the target lsn. + In the promotion case, this status indicates a timeline change occurred, + and the application should re-evaluate whether the target LSN is still + relevant. @@ -148,25 +223,34 @@ WAIT FOR LSN 'lsn' [ WITH ( Notes - WAIT FOR command waits till - lsn to be replayed on standby. - That is, after this command execution, the value returned by - pg_last_wal_replay_lsn should be greater or equal - to the lsn value. This is useful to achieve - read-your-writes-consistency, while using async replica for reads and - primary for writes. In that case, the lsn of the last - modification should be stored on the client application side or the - connection pooler side. + WAIT FOR waits until the specified + lsn is reached according to the specified + mode. The standby_replay mode + waits for the LSN to be replayed (applied to the database), which is + useful to achieve read-your-writes consistency while using an async + replica for reads and the primary for writes. The + standby_flush mode waits for the WAL to be flushed + to durable storage on the replica, providing a durability guarantee + without waiting for replay. The standby_write mode + waits for the WAL to be written to the operating system, which is + faster than flush but provides weaker durability guarantees. The + primary_flush mode waits for WAL to be flushed on + a primary server. In all cases, the LSN of the last + modification should be stored on the client application side or the + connection pooler side. - WAIT FOR command should be called on standby. - If a user runs WAIT FOR on primary, it - will error out unless NO_THROW is specified in the WITH clause. - However, if WAIT FOR is - called on primary promoted from standby and lsn - was already replayed, then the WAIT FOR command just - exits immediately. + The standby modes (standby_replay, + standby_write, standby_flush) + can only be used during recovery, and primary_flush + can only be used on a primary server. Using the wrong mode for the + current server state will result in an error. If a standby is promoted + while waiting with a standby mode, the command will return + not in recovery (or throw an error if + NO_THROW is not specified). Promotion creates a new + timeline, and the LSN being waited for may refer to WAL from the old + timeline. @@ -175,21 +259,21 @@ WAIT FOR LSN 'lsn' [ WITH ( Examples - You can use WAIT FOR command to wait for - the pg_lsn value. For example, an application could update - the movie table and get the lsn after - changes just made. This example uses pg_current_wal_insert_lsn - on primary server to get the lsn given that - synchronous_commit could be set to - off. + You can use WAIT FOR command to wait for + the pg_lsn value. For example, an application could update + the movie table and get the lsn after + changes just made. This example uses pg_current_wal_insert_lsn + on primary server to get the lsn given that + synchronous_commit could be set to + off. postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; UPDATE 100 postgres=# SELECT pg_current_wal_insert_lsn(); -pg_current_wal_insert_lsn --------------------- -0/306EE20 + pg_current_wal_insert_lsn +--------------------------- + 0/306EE20 (1 row) @@ -200,7 +284,7 @@ pg_current_wal_insert_lsn postgres=# WAIT FOR LSN '0/306EE20'; status --------- +--------- success (1 row) postgres=# SELECT * FROM movie WHERE genre = 'Drama'; @@ -211,7 +295,43 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama'; - If the target LSN is not reached before the timeout, the error is thrown. + Wait for flush (data durable on replica): + + +postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_flush'); + status +--------- + success +(1 row) + + + + + Wait for write with timeout: + + +postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_write', TIMEOUT '100ms', NO_THROW); + status +--------- + success +(1 row) + + + + + Wait for flush on primary: + + +postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'primary_flush'); + status +--------- + success +(1 row) + + + + + If the target LSN is not reached before the timeout, an error is thrown: postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s'); @@ -221,11 +341,12 @@ ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current The same example uses WAIT FOR with - NO_THROW option. + NO_THROW option: + postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW); status --------- +--------- timeout (1 row) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 05ac7c5f7f8..81dc86847c0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2918,6 +2918,14 @@ XLogFlush(XLogRecPtr record) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for, notify the waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH]))) + WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -3100,6 +3108,14 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for, notify the waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH]))) + WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush); + /* * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. @@ -6277,10 +6293,12 @@ StartupXLOG(void) WakeupCheckpointer(); /* - * Wake up all waiters for replay LSN. They need to report an error that - * recovery was ended before reaching the target LSN. + * Wake up all waiters. They need to report an error that recovery was + * ended before reaching the target LSN. */ WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c index 57d2dec07f1..97f1e778488 100644 --- a/src/backend/commands/wait.c +++ b/src/backend/commands/wait.c @@ -2,7 +2,7 @@ * * wait.c * Implements WAIT FOR, which allows waiting for events such as - * time passing or LSN having been replayed on replica. + * time passing or LSN having been replayed, flushed, or written. * * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group * @@ -15,6 +15,7 @@ #include +#include "access/xlog.h" #include "access/xlogrecovery.h" #include "access/xlogwait.h" #include "commands/defrem.h" @@ -34,12 +35,14 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) XLogRecPtr lsn; int64 timeout = 0; WaitLSNResult waitLSNResult; + WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */ bool throw = true; TupleDesc tupdesc; TupOutputState *tstate; const char *result = ""; bool timeout_specified = false; bool no_throw_specified = false; + bool mode_specified = false; /* Parse and validate the mandatory LSN */ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, @@ -47,7 +50,32 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) foreach_node(DefElem, defel, stmt->options) { - if (strcmp(defel->defname, "timeout") == 0) + if (strcmp(defel->defname, "mode") == 0) + { + char *mode_str; + + if (mode_specified) + errorConflictingDefElem(defel, pstate); + mode_specified = true; + + mode_str = defGetString(defel); + + if (pg_strcasecmp(mode_str, "standby_replay") == 0) + lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; + else if (pg_strcasecmp(mode_str, "standby_write") == 0) + lsnType = WAIT_LSN_TYPE_STANDBY_WRITE; + else if (pg_strcasecmp(mode_str, "standby_flush") == 0) + lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH; + else if (pg_strcasecmp(mode_str, "primary_flush") == 0) + lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized value for %s option \"%s\": \"%s\"", + "WAIT", defel->defname, mode_str), + parser_errposition(pstate, defel->location))); + } + else if (strcmp(defel->defname, "timeout") == 0) { char *timeout_str; const char *hintmsg; @@ -107,8 +135,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) } /* - * We are going to wait for the LSN replay. We should first care that we - * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * We are going to wait for the LSN. We should first care that we don't + * hold a snapshot and correspondingly our MyProc->xmin is invalid. * Otherwise, our snapshot could prevent the replay of WAL records * implying a kind of self-deadlock. This is the reason why WAIT FOR is a * command, not a procedure or function. @@ -140,7 +168,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) */ Assert(MyProc->xmin == InvalidTransactionId); - waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout); + /* + * Validate that the requested mode matches the current server state. + * Primary modes can only be used on a primary. + */ + if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH) + { + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("Waiting for primary_flush can only be done on a primary server. " + "Use standby_flush mode on a standby server."))); + } + + /* Now wait for the LSN */ + waitLSNResult = WaitForLSN(lsnType, lsn, timeout); /* * Process the result of WaitForLSN(). Throw appropriate error if needed. @@ -154,11 +197,48 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) case WAIT_LSN_RESULT_TIMEOUT: if (throw) - ereport(ERROR, - errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", - LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + { + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + + switch (lsnType) + { + case WAIT_LSN_TYPE_STANDBY_REPLAY: + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + case WAIT_LSN_TYPE_STANDBY_WRITE: + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + case WAIT_LSN_TYPE_STANDBY_FLUSH: + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + case WAIT_LSN_TYPE_PRIMARY_FLUSH: + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + default: + elog(ERROR, "unexpected wait LSN type %d", lsnType); + pg_unreachable(); + } + } else result = "timeout"; break; @@ -168,18 +248,72 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) { if (PromoteIsTriggered()) { - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", - LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + + switch (lsnType) + { + case WAIT_LSN_TYPE_STANDBY_REPLAY: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + case WAIT_LSN_TYPE_STANDBY_WRITE: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + case WAIT_LSN_TYPE_STANDBY_FLUSH: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(currentLSN))); + break; + + default: + elog(ERROR, "unexpected wait LSN type %d", lsnType); + pg_unreachable(); + } } else - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errhint("Waiting for the replay LSN can only be executed during recovery.")); + { + switch (lsnType) + { + case WAIT_LSN_TYPE_STANDBY_REPLAY: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the standby_replay LSN can only be executed during recovery.")); + break; + + case WAIT_LSN_TYPE_STANDBY_WRITE: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the standby_write LSN can only be executed during recovery.")); + break; + + case WAIT_LSN_TYPE_STANDBY_FLUSH: + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the standby_flush LSN can only be executed during recovery.")); + break; + + default: + elog(ERROR, "unexpected wait LSN type %d", lsnType); + pg_unreachable(); + } + } } else result = "not in recovery"; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac002f730c3..a41453530a1 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "catalog/pg_authid.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -965,6 +966,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + /* + * If we wrote an LSN that someone was waiting for, notify the waiters. + */ + if (waitLSNState && + (LogstreamResult.Write >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE]))) + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write); + /* * Close the current segment if it's fully written up in the last cycle of * the loop, to create its archive notification file soon. Otherwise WAL @@ -1004,6 +1013,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) } SpinLockRelease(&walrcv->mutex); + /* + * If we flushed an LSN that someone was waiting for, notify the + * waiters. + */ + if (waitLSNState && + (LogstreamResult.Flush >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH]))) + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush); + /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl index 5f415b9af51..bf61b8c47cf 100644 --- a/src/test/recovery/t/049_wait_for_lsn.pl +++ b/src/test/recovery/t/049_wait_for_lsn.pl @@ -1,5 +1,6 @@ -# Checks waiting for the LSN replay on standby using -# the WAIT FOR command. +# Checks waiting for the LSN using the WAIT FOR command. +# Tests standby modes (standby_replay/standby_write/standby_flush) on standby +# and primary_flush mode on primary. use strict; use warnings FATAL => 'all'; @@ -7,6 +8,42 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Helper functions to control walreceiver for testing wait conditions. +# These allow us to stop WAL streaming so waiters block, then resume it. +my $saved_primary_conninfo; + +sub stop_walreceiver +{ + my ($node) = @_; + $saved_primary_conninfo = $node->safe_psql( + 'postgres', qq[ + SELECT pg_catalog.quote_literal(setting) + FROM pg_settings + WHERE name = 'primary_conninfo'; + ]); + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = ''; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + +sub resume_walreceiver +{ + my ($node) = @_; + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = $saved_primary_conninfo; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + # Initialize primary node my $node_primary = PostgreSQL::Test::Cluster->new('primary'); $node_primary->init(allows_streaming => 1); @@ -62,7 +99,52 @@ $output = $node_standby->safe_psql( ok((split("\n", $output))[-1] eq 30, "standby reached the same LSN as primary"); -# 3. Check that waiting for unreachable LSN triggers the timeout. The +# 3. Check that WAIT FOR works with standby_write, standby_flush, and +# primary_flush modes. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(31, 40))"); +my $lsn_write = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_write}' WITH (MODE 'standby_write', timeout '1d'); + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn); +]); + +ok( (split("\n", $output))[-1] >= 0, + "standby wrote WAL up to target LSN after WAIT FOR with MODE 'standby_write'" +); + +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(41, 50))"); +my $lsn_flush = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_flush}' WITH (MODE 'standby_flush', timeout '1d'); + SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn); +]); + +ok( (split("\n", $output))[-1] >= 0, + "standby flushed WAL up to target LSN after WAIT FOR with MODE 'standby_flush'" +); + +# Check primary_flush mode on primary +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(51, 60))"); +my $lsn_primary_flush = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_primary->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_primary_flush}' WITH (MODE 'primary_flush', timeout '1d'); + SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '${lsn_primary_flush}'::pg_lsn); +]); + +ok( (split("\n", $output))[-1] >= 0, + "primary flushed WAL up to target LSN after WAIT FOR with MODE 'primary_flush'" +); + +# 4. Check that waiting for unreachable LSN triggers the timeout. The # unreachable LSN must be well in advance. So WAL records issued by # the concurrent autovacuum could not affect that. my $lsn3 = @@ -88,14 +170,26 @@ $output = $node_standby->safe_psql( WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]); ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); -# 4. Check that WAIT FOR triggers an error if called on primary, -# within another function, or inside a transaction with an isolation level -# higher than READ COMMITTED. +# 5. Check mode validation: standby modes error on primary, primary mode errors +# on standby, and primary_flush works on primary. Also check that WAIT FOR +# triggers an error if called within another function or inside a transaction +# with an isolation level higher than READ COMMITTED. -$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';", +# Test standby_flush on primary - should error +$node_primary->psql( + 'postgres', + "WAIT FOR LSN '${lsn3}' WITH (MODE 'standby_flush');", stderr => \$stderr); -ok( $stderr =~ /recovery is not in progress/, - "get an error when running on the primary"); +ok($stderr =~ /recovery is not in progress/, + "get an error when running standby_flush on the primary"); + +# Test primary_flush on standby - should error +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${lsn3}' WITH (MODE 'primary_flush');", + stderr => \$stderr); +ok($stderr =~ /recovery is in progress/, + "get an error when running primary_flush on the standby"); $node_standby->psql( 'postgres', @@ -125,7 +219,7 @@ ok( $stderr =~ /WAIT FOR must be called without an active or registered snapshot/, "get an error when running within another function"); -# 5. Check parameter validation error cases on standby before promotion +# 6. Check parameter validation error cases on standby before promotion my $test_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); @@ -208,10 +302,26 @@ $node_standby->psql( ok( $stderr =~ /option "invalid_option" not recognized/, "get error for invalid WITH clause option"); -# 6. Check the scenario of multiple LSN waiters. We make 5 background -# psql sessions each waiting for a corresponding insertion. When waiting is -# finished, stored procedures logs if there are visible as many rows as -# should be. +# Test invalid MODE value +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (MODE 'invalid');", + stderr => \$stderr); +ok($stderr =~ /unrecognized value for WAIT option "mode": "invalid"/, + "get error for invalid MODE value"); + +# Test duplicate MODE parameter +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (MODE 'standby_replay', MODE 'standby_write');", + stderr => \$stderr); +ok( $stderr =~ /conflicting or redundant options/, + "get error for duplicate MODE parameter"); + +# 7a. Check the scenario of multiple standby_replay waiters. We make 5 +# background psql sessions each waiting for a corresponding insertion. When +# waiting is finished, stored procedures logs if there are visible as many +# rows as should be. $node_primary->safe_psql( 'postgres', qq[ CREATE FUNCTION log_count(i int) RETURNS void AS \$\$ @@ -225,8 +335,17 @@ CREATE FUNCTION log_count(i int) RETURNS void AS \$\$ END \$\$ LANGUAGE plpgsql; + +CREATE FUNCTION log_wait_done(prefix text, i int) RETURNS void AS \$\$ + BEGIN + RAISE LOG '% %', prefix, i; + END +\$\$ +LANGUAGE plpgsql; ]); + $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + my @psql_sessions; for (my $i = 0; $i < 5; $i++) { @@ -243,6 +362,7 @@ for (my $i = 0; $i < 5; $i++) SELECT log_count(${i}); ]); } + my $log_offset = -s $node_standby->logfile; $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); for (my $i = 0; $i < 5; $i++) @@ -251,23 +371,246 @@ for (my $i = 0; $i < 5; $i++) $psql_sessions[$i]->quit; } -ok(1, 'multiple LSN waiters reported consistent data'); +ok(1, 'multiple standby_replay waiters reported consistent data'); -# 7. Check that the standby promotion terminates the wait on LSN. Start -# waiting for an unreachable LSN then promote. Check the log for the relevant -# error message. Also, check that waiting for already replayed LSN doesn't -# cause an error even after promotion. +# 7b. Check the scenario of multiple standby_write waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @write_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (100 + ${i});"); + $write_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start standby_write waiters (they will block since walreceiver is stopped) +my @write_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $write_sessions[$i] = $node_standby->background_psql('postgres'); + $write_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$write_lsns[$i]' WITH (MODE 'standby_write', timeout '1d'); + SELECT log_wait_done('write_done', $i); + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'" +); + +# Restore walreceiver to unblock waiters +my $write_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("write_done $i", $write_log_offset); + $write_sessions[$i]->quit; +} + +# Verify on standby that WAL was written up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);" +); + +ok($output >= 0, + "multiple standby_write waiters: standby wrote WAL up to target LSN"); + +# 7c. Check the scenario of multiple standby_flush waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @flush_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (200 + ${i});"); + $flush_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start standby_flush waiters (they will block since walreceiver is stopped) +my @flush_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $flush_sessions[$i] = $node_standby->background_psql('postgres'); + $flush_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$flush_lsns[$i]' WITH (MODE 'standby_flush', timeout '1d'); + SELECT log_wait_done('flush_done', $i); + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'" +); + +# Restore walreceiver to unblock waiters +my $flush_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("flush_done $i", $flush_log_offset); + $flush_sessions[$i]->quit; +} + +# Verify on standby that WAL was flushed up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);" +); + +ok($output >= 0, + "multiple standby_flush waiters: standby flushed WAL up to target LSN"); + +# 7d. Check the scenario of mixed standby mode waiters (standby_replay, +# standby_write, standby_flush) running concurrently. We start 6 sessions: +# 2 for each mode, all waiting for the same target LSN. We stop the +# walreceiver and pause replay to ensure all waiters block. Then we resume +# replay and restart the walreceiver to verify they unblock and complete +# correctly. + +# Stop walreceiver first to ensure we can control the flow without hanging +# (stopping it after pausing replay can hang if the startup process is paused). +stop_walreceiver($node_standby); + +# Pause replay +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + +# Generate WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(301, 310));"); +my $mixed_target_lsn = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); + +# Start 6 waiters: 2 for each mode +my @mixed_sessions; +my @mixed_modes = ('standby_replay', 'standby_write', 'standby_flush'); +for (my $i = 0; $i < 6; $i++) +{ + $mixed_sessions[$i] = $node_standby->background_psql('postgres'); + $mixed_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${mixed_target_lsn}' WITH (MODE '$mixed_modes[$i % 3]', timeout '1d'); + SELECT log_wait_done('mixed_done', $i); + ]); +} + +# Verify all waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'" +); + +# Resume replay (waiters should still be blocked as no WAL has arrived) +my $mixed_log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +$node_standby->poll_query_until('postgres', + "SELECT NOT pg_is_wal_replay_paused();"); + +# Restore walreceiver to allow WAL to arrive +resume_walreceiver($node_standby); + +# Wait for all sessions to complete and close them +for (my $i = 0; $i < 6; $i++) +{ + $node_standby->wait_for_log("mixed_done $i", $mixed_log_offset); + $mixed_sessions[$i]->quit; +} + +# Verify all modes reached the target LSN +$output = $node_standby->safe_psql( + 'postgres', qq[ + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0; +]); + +ok($output eq 't', + "mixed mode waiters: all modes completed and reached target LSN"); + +# 7e. Check the scenario of multiple primary_flush waiters on primary. +# We start 5 background sessions waiting for different LSNs with primary_flush +# mode. Each waiter logs when done. +my @primary_flush_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (400 + ${i});"); + $primary_flush_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +my $primary_flush_log_offset = -s $node_primary->logfile; + +# Start primary_flush waiters +my @primary_flush_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $primary_flush_sessions[$i] = $node_primary->background_psql('postgres'); + $primary_flush_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$primary_flush_lsns[$i]' WITH (MODE 'primary_flush', timeout '1d'); + SELECT log_wait_done('primary_flush_done', $i); + ]); +} + +# The WAL should already be flushed, so waiters should complete quickly +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->wait_for_log("primary_flush_done $i", + $primary_flush_log_offset); + $primary_flush_sessions[$i]->quit; +} + +# Verify on primary that WAL was flushed up to the target LSN +$output = $node_primary->safe_psql('postgres', + "SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '$primary_flush_lsns[4]'::pg_lsn);" +); + +ok($output >= 0, + "multiple primary_flush waiters: primary flushed WAL up to target LSN"); + +# 8. Check that the standby promotion terminates all standby wait modes. Start +# waiting for unreachable LSNs with standby_replay, standby_write, and +# standby_flush modes, then promote. Check the log for the relevant error +# messages. Also, check that waiting for already replayed LSN doesn't cause +# an error even after promotion. my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn() + 10000000000"); + my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); -my $psql_session = $node_standby->background_psql('postgres'); -$psql_session->query_until( - qr/start/, qq[ - \\echo start - WAIT FOR LSN '${lsn4}'; -]); + +# Start background sessions waiting for unreachable LSN with all modes +my @wait_modes = ('standby_replay', 'standby_write', 'standby_flush'); +my @wait_sessions; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i] = $node_standby->background_psql('postgres'); + $wait_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}' WITH (MODE '$wait_modes[$i]'); + ]); +} # Make sure standby will be promoted at least at the primary insert LSN we # have just observed. Use pg_switch_wal() to force the insert LSN to be @@ -277,9 +620,16 @@ $node_primary->wait_for_catchup($node_standby); $log_offset = -s $node_standby->logfile; $node_standby->promote; -$node_standby->wait_for_log('recovery is not in progress', $log_offset); -ok(1, 'got error after standby promote'); +# Wait for all three sessions to get the error (each mode has distinct message) +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/, + $log_offset); +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/, + $log_offset); +$node_standby->wait_for_log( + qr/Recovery ended before target LSN.*was replayed/, $log_offset); + +ok(1, 'promotion interrupted all wait modes'); $node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); @@ -295,8 +645,11 @@ ok($output eq "not in recovery", $node_standby->stop; $node_primary->stop; -# If we send \q with $psql_session->quit the command can be sent to the session +# If we send \q with $session->quit the command can be sent to the session # already closed. So \q is in initial script, here we only finish IPC::Run. -$psql_session->{run}->finish; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i]->{run}->finish; +} done_testing();