mirror of
https://github.com/postgres/postgres.git
synced 2025-07-07 00:36:50 +03:00
Revert pg_wal_replay_wait() stored procedure
This commit reverts3c5db1d6b0
, and subsequent improvements and fixes including8036d73ae3
,867d396ccd
,3ac3ec580c
,0868d7ae70
,85b98b8d5a
,2520226c95
,014f9f34d2
,e658038772
,e1555645d7
,5035172e4a
,6cfebfe88b
,73da6b8d1b
, ande546989a26
. The reason for reverting is a set of remaining issues. Most notably, the stored procedure appears to need more effort than the utility statement to turn the backend into a "snapshot-less" state. This makes an approach to use stored procedures questionable. Catversion is bumped. Discussion: https://postgr.es/m/Zyhj2anOPRKtb0xW%40paquier.xyz
This commit is contained in:
@ -29000,176 +29000,6 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
|
|||||||
the pause, the rate of WAL generation and available disk space.
|
the pause, the rate of WAL generation and available disk space.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
|
||||||
The procedure shown in <xref linkend="recovery-synchronization-procedure-table"/>
|
|
||||||
can be executed only during recovery.
|
|
||||||
</para>
|
|
||||||
|
|
||||||
<table id="recovery-synchronization-procedure-table">
|
|
||||||
<title>Recovery Synchronization Procedure and Function</title>
|
|
||||||
<tgroup cols="1">
|
|
||||||
<thead>
|
|
||||||
<row>
|
|
||||||
<entry role="func_table_entry"><para role="func_signature">
|
|
||||||
Procedure or Function
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Type
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Description
|
|
||||||
</para></entry>
|
|
||||||
</row>
|
|
||||||
</thead>
|
|
||||||
|
|
||||||
<tbody>
|
|
||||||
<row>
|
|
||||||
<entry role="func_table_entry"><para role="func_signature">
|
|
||||||
<indexterm>
|
|
||||||
<primary>pg_wal_replay_wait</primary>
|
|
||||||
</indexterm>
|
|
||||||
<function>pg_wal_replay_wait</function> (
|
|
||||||
<parameter>target_lsn</parameter> <type>pg_lsn</type>,
|
|
||||||
<parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>,
|
|
||||||
<parameter>no_error</parameter> <type>bool</type> <literal>DEFAULT</literal> <literal>false</literal>)
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Procedure
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Waits until recovery replays <literal>target_lsn</literal>.
|
|
||||||
If no <parameter>timeout</parameter> is specified or it is set to
|
|
||||||
zero, this procedure waits indefinitely for the
|
|
||||||
<literal>target_lsn</literal>. If the <parameter>timeout</parameter>
|
|
||||||
is specified (in milliseconds) and is greater than zero, the
|
|
||||||
procedure waits until <literal>target_lsn</literal> is reached or
|
|
||||||
the specified <parameter>timeout</parameter> has elapsed.
|
|
||||||
On timeout, or if the server is promoted before
|
|
||||||
<literal>target_lsn</literal> is reached, an error is emitted,
|
|
||||||
as soon as <parameter>no_error</parameter> is false.
|
|
||||||
If <parameter>no_error</parameter> is set to true, then the procedure
|
|
||||||
doesn't throw errors. The last result status could be read
|
|
||||||
with <function>pg_wal_replay_wait_status</function>.
|
|
||||||
</para></entry>
|
|
||||||
</row>
|
|
||||||
|
|
||||||
<row>
|
|
||||||
<entry role="func_table_entry"><para role="func_signature">
|
|
||||||
<indexterm>
|
|
||||||
<primary>pg_wal_replay_wait_status</primary>
|
|
||||||
</indexterm>
|
|
||||||
<function>pg_wal_replay_wait_status</function> ()
|
|
||||||
<returnvalue>text</returnvalue>
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Function
|
|
||||||
</para>
|
|
||||||
<para>
|
|
||||||
Returns the last result status for
|
|
||||||
<function>pg_wal_replay_wait</function> procedure. The possible
|
|
||||||
values are <literal>success</literal>, <literal>timeout</literal>,
|
|
||||||
and <literal>not in recovery</literal>.
|
|
||||||
</para></entry>
|
|
||||||
</row>
|
|
||||||
</tbody>
|
|
||||||
</tgroup>
|
|
||||||
</table>
|
|
||||||
|
|
||||||
<para>
|
|
||||||
<function>pg_wal_replay_wait</function> waits till
|
|
||||||
<parameter>target_lsn</parameter> to be replayed on standby.
|
|
||||||
That is, after this function execution, the value returned by
|
|
||||||
<function>pg_last_wal_replay_lsn</function> should be greater or equal
|
|
||||||
to the <parameter>target_lsn</parameter> value. This is useful to achieve
|
|
||||||
read-your-writes-consistency, while using async replica for reads and
|
|
||||||
primary for writes. In that case <acronym>lsn</acronym> of the last
|
|
||||||
modification should be stored on the client application side or the
|
|
||||||
connection pooler side.
|
|
||||||
</para>
|
|
||||||
|
|
||||||
<para>
|
|
||||||
<function>pg_wal_replay_wait</function> should be called on standby.
|
|
||||||
If a user calls <function>pg_wal_replay_wait</function> on primary, it
|
|
||||||
will error out as soon as <parameter>no_error</parameter> is false.
|
|
||||||
However, if <function>pg_wal_replay_wait</function> is
|
|
||||||
called on primary promoted from standby and <literal>target_lsn</literal>
|
|
||||||
was already replayed, then <function>pg_wal_replay_wait</function> just
|
|
||||||
exits immediately.
|
|
||||||
</para>
|
|
||||||
|
|
||||||
<para>
|
|
||||||
You can use <function>pg_wal_replay_wait</function> to wait for
|
|
||||||
the <type>pg_lsn</type> value. For example, an application could update
|
|
||||||
the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
|
|
||||||
changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
|
|
||||||
on primary server to get the <acronym>lsn</acronym> given that
|
|
||||||
<varname>synchronous_commit</varname> could be set to
|
|
||||||
<literal>off</literal>.
|
|
||||||
|
|
||||||
<programlisting>
|
|
||||||
postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
|
|
||||||
UPDATE 100
|
|
||||||
postgres=# SELECT pg_current_wal_insert_lsn();
|
|
||||||
pg_current_wal_insert_lsn
|
|
||||||
--------------------
|
|
||||||
0/306EE20
|
|
||||||
(1 row)
|
|
||||||
</programlisting>
|
|
||||||
|
|
||||||
Then an application could run <function>pg_wal_replay_wait</function>
|
|
||||||
with the <acronym>lsn</acronym> obtained from primary. After that the
|
|
||||||
changes made on primary should be guaranteed to be visible on replica.
|
|
||||||
|
|
||||||
<programlisting>
|
|
||||||
postgres=# CALL pg_wal_replay_wait('0/306EE20');
|
|
||||||
CALL
|
|
||||||
postgres=# SELECT * FROM movie WHERE genre = 'Drama';
|
|
||||||
genre
|
|
||||||
-------
|
|
||||||
(0 rows)
|
|
||||||
</programlisting>
|
|
||||||
|
|
||||||
It may also happen that target <acronym>lsn</acronym> is not reached
|
|
||||||
within the timeout. In that case the error is thrown.
|
|
||||||
|
|
||||||
<programlisting>
|
|
||||||
postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
|
|
||||||
ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
|
|
||||||
</programlisting>
|
|
||||||
|
|
||||||
The same example uses <function>pg_wal_replay_wait</function> with
|
|
||||||
<parameter>no_error</parameter> set to true. In this case, the result
|
|
||||||
status must be read with <function>pg_wal_replay_wait_status</function>.
|
|
||||||
|
|
||||||
<programlisting>
|
|
||||||
postgres=# CALL pg_wal_replay_wait('0/306EE20', 100, true);
|
|
||||||
CALL
|
|
||||||
postgres=# SELECT pg_wal_replay_wait_status();
|
|
||||||
pg_wal_replay_wait_status
|
|
||||||
---------------------------
|
|
||||||
timeout
|
|
||||||
(1 row)
|
|
||||||
</programlisting>
|
|
||||||
|
|
||||||
</para>
|
|
||||||
|
|
||||||
<para>
|
|
||||||
<function>pg_wal_replay_wait</function> can't be used within
|
|
||||||
a transaction with an isolation level higher than
|
|
||||||
<literal>READ COMMITTED</literal>, another procedure, or a function.
|
|
||||||
All the cases above imply holding a snapshot, which could prevent
|
|
||||||
WAL records from replaying (see <xref linkend="hot-standby-conflict"/>)
|
|
||||||
and cause an indirect deadlock.
|
|
||||||
|
|
||||||
<programlisting>
|
|
||||||
postgres=# BEGIN;
|
|
||||||
BEGIN
|
|
||||||
postgres=*# CALL pg_wal_replay_wait('0/306EE20');
|
|
||||||
ERROR: pg_wal_replay_wait() must be only called without an active or registered snapshot
|
|
||||||
DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.
|
|
||||||
</programlisting>
|
|
||||||
|
|
||||||
</para>
|
|
||||||
</sect2>
|
</sect2>
|
||||||
|
|
||||||
<sect2 id="functions-snapshot-synchronization">
|
<sect2 id="functions-snapshot-synchronization">
|
||||||
|
@ -36,8 +36,7 @@ OBJS = \
|
|||||||
xlogreader.o \
|
xlogreader.o \
|
||||||
xlogrecovery.o \
|
xlogrecovery.o \
|
||||||
xlogstats.o \
|
xlogstats.o \
|
||||||
xlogutils.o \
|
xlogutils.o
|
||||||
xlogwait.o
|
|
||||||
|
|
||||||
include $(top_srcdir)/src/backend/common.mk
|
include $(top_srcdir)/src/backend/common.mk
|
||||||
|
|
||||||
|
@ -24,7 +24,6 @@ backend_sources += files(
|
|||||||
'xlogrecovery.c',
|
'xlogrecovery.c',
|
||||||
'xlogstats.c',
|
'xlogstats.c',
|
||||||
'xlogutils.c',
|
'xlogutils.c',
|
||||||
'xlogwait.c',
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# used by frontend programs to build a frontend xlogreader
|
# used by frontend programs to build a frontend xlogreader
|
||||||
|
@ -31,7 +31,6 @@
|
|||||||
#include "access/xloginsert.h"
|
#include "access/xloginsert.h"
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#include "access/xlogutils.h"
|
#include "access/xlogutils.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "catalog/pg_enum.h"
|
#include "catalog/pg_enum.h"
|
||||||
@ -2827,11 +2826,6 @@ AbortTransaction(void)
|
|||||||
*/
|
*/
|
||||||
LWLockReleaseAll();
|
LWLockReleaseAll();
|
||||||
|
|
||||||
/*
|
|
||||||
* Cleanup waiting for LSN if any.
|
|
||||||
*/
|
|
||||||
WaitLSNCleanup();
|
|
||||||
|
|
||||||
/* Clear wait information and command progress indicator */
|
/* Clear wait information and command progress indicator */
|
||||||
pgstat_report_wait_end();
|
pgstat_report_wait_end();
|
||||||
pgstat_progress_end_command();
|
pgstat_progress_end_command();
|
||||||
|
@ -62,7 +62,6 @@
|
|||||||
#include "access/xlogreader.h"
|
#include "access/xlogreader.h"
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#include "access/xlogutils.h"
|
#include "access/xlogutils.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "backup/basebackup.h"
|
#include "backup/basebackup.h"
|
||||||
#include "catalog/catversion.h"
|
#include "catalog/catversion.h"
|
||||||
#include "catalog/pg_control.h"
|
#include "catalog/pg_control.h"
|
||||||
@ -6174,12 +6173,6 @@ StartupXLOG(void)
|
|||||||
UpdateControlFile();
|
UpdateControlFile();
|
||||||
LWLockRelease(ControlFileLock);
|
LWLockRelease(ControlFileLock);
|
||||||
|
|
||||||
/*
|
|
||||||
* Wake up all waiters for replay LSN. They need to report an error that
|
|
||||||
* recovery was ended before reaching the target LSN.
|
|
||||||
*/
|
|
||||||
WaitLSNWakeup(InvalidXLogRecPtr);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Shutdown the recovery environment. This must occur after
|
* Shutdown the recovery environment. This must occur after
|
||||||
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
|
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
|
||||||
|
@ -22,19 +22,17 @@
|
|||||||
#include "access/xlog_internal.h"
|
#include "access/xlog_internal.h"
|
||||||
#include "access/xlogbackup.h"
|
#include "access/xlogbackup.h"
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
#include "replication/walreceiver.h"
|
#include "replication/walreceiver.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/latch.h"
|
||||||
#include "storage/standby.h"
|
#include "storage/standby.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/pg_lsn.h"
|
#include "utils/pg_lsn.h"
|
||||||
#include "utils/snapmgr.h"
|
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -750,115 +748,3 @@ pg_promote(PG_FUNCTION_ARGS)
|
|||||||
wait_seconds)));
|
wait_seconds)));
|
||||||
PG_RETURN_BOOL(false);
|
PG_RETURN_BOOL(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
static WaitLSNResult lastWaitLSNResult = WAIT_LSN_RESULT_SUCCESS;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Waits until recovery replays the target LSN with optional timeout. Unless
|
|
||||||
* 'no_error' provided throws an error on failure
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
pg_wal_replay_wait(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
XLogRecPtr target_lsn = PG_GETARG_LSN(0);
|
|
||||||
int64 timeout = PG_GETARG_INT64(1);
|
|
||||||
bool no_error = PG_GETARG_BOOL(2);
|
|
||||||
|
|
||||||
if (timeout < 0)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
|
|
||||||
errmsg("\"timeout\" must not be negative")));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We are going to wait for the LSN replay. We should first care that we
|
|
||||||
* don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
|
|
||||||
* Otherwise, our snapshot could prevent the replay of WAL records
|
|
||||||
* implying a kind of self-deadlock. This is the reason why
|
|
||||||
* pg_wal_replay_wait() is a procedure, not a function.
|
|
||||||
*
|
|
||||||
* At first, we should check there is no active snapshot. According to
|
|
||||||
* PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
|
|
||||||
* processed with a snapshot. Thankfully, we can pop this snapshot,
|
|
||||||
* because PortalRunUtility() can tolerate this.
|
|
||||||
*/
|
|
||||||
if (ActiveSnapshotSet())
|
|
||||||
PopActiveSnapshot();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* At second, invalidate a catalog snapshot if any. And we should be done
|
|
||||||
* with the preparation.
|
|
||||||
*/
|
|
||||||
InvalidateCatalogSnapshot();
|
|
||||||
|
|
||||||
/* Give up if there is still an active or registered snapshot. */
|
|
||||||
if (GetOldestSnapshot())
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"),
|
|
||||||
errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.")));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* As the result we should hold no snapshot, and correspondingly our xmin
|
|
||||||
* should be unset.
|
|
||||||
*/
|
|
||||||
Assert(MyProc->xmin == InvalidTransactionId);
|
|
||||||
|
|
||||||
lastWaitLSNResult = WaitForLSNReplay(target_lsn, timeout);
|
|
||||||
|
|
||||||
if (no_error)
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Process the result of WaitForLSNReplay(). Throw appropriate error if
|
|
||||||
* needed.
|
|
||||||
*/
|
|
||||||
switch (lastWaitLSNResult)
|
|
||||||
{
|
|
||||||
case WAIT_LSN_RESULT_SUCCESS:
|
|
||||||
/* Nothing to do on success */
|
|
||||||
break;
|
|
||||||
|
|
||||||
case WAIT_LSN_RESULT_TIMEOUT:
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_QUERY_CANCELED),
|
|
||||||
errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
|
|
||||||
LSN_FORMAT_ARGS(target_lsn),
|
|
||||||
LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("recovery is not in progress"),
|
|
||||||
errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
|
|
||||||
LSN_FORMAT_ARGS(target_lsn),
|
|
||||||
LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
Datum
|
|
||||||
pg_wal_replay_wait_status(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
const char *result_string = "";
|
|
||||||
|
|
||||||
/* Process the result of WaitForLSNReplay(). */
|
|
||||||
switch (lastWaitLSNResult)
|
|
||||||
{
|
|
||||||
case WAIT_LSN_RESULT_SUCCESS:
|
|
||||||
result_string = "success";
|
|
||||||
break;
|
|
||||||
|
|
||||||
case WAIT_LSN_RESULT_TIMEOUT:
|
|
||||||
result_string = "timeout";
|
|
||||||
break;
|
|
||||||
|
|
||||||
case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
|
|
||||||
result_string = "not in recovery";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
PG_RETURN_TEXT_P(cstring_to_text(result_string));
|
|
||||||
}
|
|
||||||
|
@ -40,7 +40,6 @@
|
|||||||
#include "access/xlogreader.h"
|
#include "access/xlogreader.h"
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#include "access/xlogutils.h"
|
#include "access/xlogutils.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "backup/basebackup.h"
|
#include "backup/basebackup.h"
|
||||||
#include "catalog/pg_control.h"
|
#include "catalog/pg_control.h"
|
||||||
#include "commands/tablespace.h"
|
#include "commands/tablespace.h"
|
||||||
@ -1829,16 +1828,6 @@ PerformWalRecovery(void)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* If we replayed an LSN that someone was waiting for then walk
|
|
||||||
* over the shared memory array and set latches to notify the
|
|
||||||
* waiters.
|
|
||||||
*/
|
|
||||||
if (waitLSNState &&
|
|
||||||
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
|
|
||||||
pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
|
|
||||||
WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr);
|
|
||||||
|
|
||||||
/* Else, try to fetch the next WAL record */
|
/* Else, try to fetch the next WAL record */
|
||||||
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
|
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
|
||||||
} while (record != NULL);
|
} while (record != NULL);
|
||||||
|
@ -1,337 +0,0 @@
|
|||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* xlogwait.c
|
|
||||||
* Implements waiting for the given replay LSN, which is used in
|
|
||||||
* CALL pg_wal_replay_wait(target_lsn pg_lsn,
|
|
||||||
* timeout float8, no_error bool).
|
|
||||||
*
|
|
||||||
* Copyright (c) 2024, PostgreSQL Global Development Group
|
|
||||||
*
|
|
||||||
* IDENTIFICATION
|
|
||||||
* src/backend/access/transam/xlogwait.c
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "postgres.h"
|
|
||||||
|
|
||||||
#include <float.h>
|
|
||||||
#include <math.h>
|
|
||||||
|
|
||||||
#include "pgstat.h"
|
|
||||||
#include "access/xlog.h"
|
|
||||||
#include "access/xlogrecovery.h"
|
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "miscadmin.h"
|
|
||||||
#include "storage/latch.h"
|
|
||||||
#include "storage/proc.h"
|
|
||||||
#include "storage/shmem.h"
|
|
||||||
#include "utils/fmgrprotos.h"
|
|
||||||
#include "utils/pg_lsn.h"
|
|
||||||
#include "utils/snapmgr.h"
|
|
||||||
|
|
||||||
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
|
|
||||||
void *arg);
|
|
||||||
|
|
||||||
struct WaitLSNState *waitLSNState = NULL;
|
|
||||||
|
|
||||||
/* Report the amount of shared memory space needed for WaitLSNState. */
|
|
||||||
Size
|
|
||||||
WaitLSNShmemSize(void)
|
|
||||||
{
|
|
||||||
Size size;
|
|
||||||
|
|
||||||
size = offsetof(WaitLSNState, procInfos);
|
|
||||||
size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Initialize the WaitLSNState in the shared memory. */
|
|
||||||
void
|
|
||||||
WaitLSNShmemInit(void)
|
|
||||||
{
|
|
||||||
bool found;
|
|
||||||
|
|
||||||
waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
|
|
||||||
WaitLSNShmemSize(),
|
|
||||||
&found);
|
|
||||||
if (!found)
|
|
||||||
{
|
|
||||||
pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
|
|
||||||
pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
|
|
||||||
memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Comparison function for waitLSN->waitersHeap heap. Waiting processes are
|
|
||||||
* ordered by lsn, so that the waiter with smallest lsn is at the top.
|
|
||||||
*/
|
|
||||||
static int
|
|
||||||
waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
|
|
||||||
{
|
|
||||||
const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
|
|
||||||
const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
|
|
||||||
|
|
||||||
if (aproc->waitLSN < bproc->waitLSN)
|
|
||||||
return 1;
|
|
||||||
else if (aproc->waitLSN > bproc->waitLSN)
|
|
||||||
return -1;
|
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Update waitLSN->minWaitedLSN according to the current state of
|
|
||||||
* waitLSN->waitersHeap.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
updateMinWaitedLSN(void)
|
|
||||||
{
|
|
||||||
XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
|
|
||||||
|
|
||||||
if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
|
|
||||||
{
|
|
||||||
pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
|
|
||||||
|
|
||||||
minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
|
|
||||||
}
|
|
||||||
|
|
||||||
pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Put the current process into the heap of LSN waiters.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
addLSNWaiter(XLogRecPtr lsn)
|
|
||||||
{
|
|
||||||
WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
|
|
||||||
|
|
||||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
Assert(!procInfo->inHeap);
|
|
||||||
|
|
||||||
procInfo->procno = MyProcNumber;
|
|
||||||
procInfo->waitLSN = lsn;
|
|
||||||
|
|
||||||
pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
|
|
||||||
procInfo->inHeap = true;
|
|
||||||
updateMinWaitedLSN();
|
|
||||||
|
|
||||||
LWLockRelease(WaitLSNLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove the current process from the heap of LSN waiters if it's there.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
deleteLSNWaiter(void)
|
|
||||||
{
|
|
||||||
WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
|
|
||||||
|
|
||||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
if (!procInfo->inHeap)
|
|
||||||
{
|
|
||||||
LWLockRelease(WaitLSNLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
|
|
||||||
procInfo->inHeap = false;
|
|
||||||
updateMinWaitedLSN();
|
|
||||||
|
|
||||||
LWLockRelease(WaitLSNLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove waiters whose LSN has been replayed from the heap and set their
|
|
||||||
* latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
|
|
||||||
* and set latches for all waiters.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
WaitLSNWakeup(XLogRecPtr currentLSN)
|
|
||||||
{
|
|
||||||
int i;
|
|
||||||
ProcNumber *wakeUpProcs;
|
|
||||||
int numWakeUpProcs = 0;
|
|
||||||
|
|
||||||
wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
|
|
||||||
|
|
||||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Iterate the pairing heap of waiting processes till we find LSN not yet
|
|
||||||
* replayed. Record the process numbers to wake up, but to avoid holding
|
|
||||||
* the lock for too long, send the wakeups only after releasing the lock.
|
|
||||||
*/
|
|
||||||
while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
|
|
||||||
{
|
|
||||||
pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
|
|
||||||
WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
|
|
||||||
|
|
||||||
if (!XLogRecPtrIsInvalid(currentLSN) &&
|
|
||||||
procInfo->waitLSN > currentLSN)
|
|
||||||
break;
|
|
||||||
|
|
||||||
wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
|
|
||||||
(void) pairingheap_remove_first(&waitLSNState->waitersHeap);
|
|
||||||
procInfo->inHeap = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
updateMinWaitedLSN();
|
|
||||||
|
|
||||||
LWLockRelease(WaitLSNLock);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Set latches for processes, whose waited LSNs are already replayed. As
|
|
||||||
* the time consuming operations, we do it this outside of WaitLSNLock.
|
|
||||||
* This is actually fine because procLatch isn't ever freed, so we just
|
|
||||||
* can potentially set the wrong process' (or no process') latch.
|
|
||||||
*/
|
|
||||||
for (i = 0; i < numWakeUpProcs; i++)
|
|
||||||
{
|
|
||||||
SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
|
|
||||||
}
|
|
||||||
pfree(wakeUpProcs);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Delete our item from shmem array if any.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
WaitLSNCleanup(void)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We do a fast-path check of the 'inHeap' flag without the lock. This
|
|
||||||
* flag is set to true only by the process itself. So, it's only possible
|
|
||||||
* to get a false positive. But that will be eliminated by a recheck
|
|
||||||
* inside deleteLSNWaiter().
|
|
||||||
*/
|
|
||||||
if (waitLSNState->procInfos[MyProcNumber].inHeap)
|
|
||||||
deleteLSNWaiter();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Wait using MyLatch till the given LSN is replayed, the postmaster dies or
|
|
||||||
* timeout happens.
|
|
||||||
*/
|
|
||||||
WaitLSNResult
|
|
||||||
WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
|
|
||||||
{
|
|
||||||
XLogRecPtr currentLSN;
|
|
||||||
TimestampTz endtime = 0;
|
|
||||||
int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
|
|
||||||
|
|
||||||
/* Shouldn't be called when shmem isn't initialized */
|
|
||||||
Assert(waitLSNState);
|
|
||||||
|
|
||||||
/* Should have a valid proc number */
|
|
||||||
Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
|
|
||||||
|
|
||||||
if (!RecoveryInProgress())
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Recovery is not in progress. Given that we detected this in the
|
|
||||||
* very first check, this procedure was mistakenly called on primary.
|
|
||||||
* However, it's possible that standby was promoted concurrently to
|
|
||||||
* the procedure call, while target LSN is replayed. So, we still
|
|
||||||
* check the last replay LSN before reporting an error.
|
|
||||||
*/
|
|
||||||
if (targetLSN <= GetXLogReplayRecPtr(NULL))
|
|
||||||
return WAIT_LSN_RESULT_SUCCESS;
|
|
||||||
return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* If target LSN is already replayed, exit immediately */
|
|
||||||
if (targetLSN <= GetXLogReplayRecPtr(NULL))
|
|
||||||
return WAIT_LSN_RESULT_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeout > 0)
|
|
||||||
{
|
|
||||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
|
|
||||||
wake_events |= WL_TIMEOUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Add our process to the pairing heap of waiters. It might happen that
|
|
||||||
* target LSN gets replayed before we do. Another check at the beginning
|
|
||||||
* of the loop below prevents the race condition.
|
|
||||||
*/
|
|
||||||
addLSNWaiter(targetLSN);
|
|
||||||
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
int rc;
|
|
||||||
long delay_ms = 0;
|
|
||||||
|
|
||||||
/* Recheck that recovery is still in-progress */
|
|
||||||
if (!RecoveryInProgress())
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Recovery was ended, but recheck if target LSN was already
|
|
||||||
* replayed. See the comment regarding deleteLSNWaiter() below.
|
|
||||||
*/
|
|
||||||
deleteLSNWaiter();
|
|
||||||
currentLSN = GetXLogReplayRecPtr(NULL);
|
|
||||||
if (targetLSN <= currentLSN)
|
|
||||||
return WAIT_LSN_RESULT_SUCCESS;
|
|
||||||
return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Check if the waited LSN has been replayed */
|
|
||||||
currentLSN = GetXLogReplayRecPtr(NULL);
|
|
||||||
if (targetLSN <= currentLSN)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the timeout value is specified, calculate the number of
|
|
||||||
* milliseconds before the timeout. Exit if the timeout is already
|
|
||||||
* reached.
|
|
||||||
*/
|
|
||||||
if (timeout > 0)
|
|
||||||
{
|
|
||||||
delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
|
|
||||||
if (delay_ms <= 0)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
|
|
||||||
rc = WaitLatch(MyLatch, wake_events, delay_ms,
|
|
||||||
WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Emergency bailout if postmaster has died. This is to avoid the
|
|
||||||
* necessity for manual cleanup of all postmaster children.
|
|
||||||
*/
|
|
||||||
if (rc & WL_POSTMASTER_DEATH)
|
|
||||||
ereport(FATAL,
|
|
||||||
(errcode(ERRCODE_ADMIN_SHUTDOWN),
|
|
||||||
errmsg("terminating connection due to unexpected postmaster exit"),
|
|
||||||
errcontext("while waiting for LSN replay")));
|
|
||||||
|
|
||||||
if (rc & WL_LATCH_SET)
|
|
||||||
ResetLatch(MyLatch);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Delete our process from the shared memory pairing heap. We might
|
|
||||||
* already be deleted by the startup process. The 'inHeap' flag prevents
|
|
||||||
* us from the double deletion.
|
|
||||||
*/
|
|
||||||
deleteLSNWaiter();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If we didn't reach the target LSN, we must be exited by timeout.
|
|
||||||
*/
|
|
||||||
if (targetLSN > currentLSN)
|
|
||||||
return WAIT_LSN_RESULT_TIMEOUT;
|
|
||||||
|
|
||||||
return WAIT_LSN_RESULT_SUCCESS;
|
|
||||||
}
|
|
@ -414,11 +414,6 @@ CREATE OR REPLACE FUNCTION
|
|||||||
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
|
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
|
||||||
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
|
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn,
|
|
||||||
timeout int8 DEFAULT 0,
|
|
||||||
no_error bool DEFAULT false)
|
|
||||||
LANGUAGE internal AS 'pg_wal_replay_wait';
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
|
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
|
||||||
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
|
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
|
||||||
OUT lsn pg_lsn, OUT xid xid, OUT data text)
|
OUT lsn pg_lsn, OUT xid xid, OUT data text)
|
||||||
|
@ -44,26 +44,12 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
|
|||||||
pairingheap *heap;
|
pairingheap *heap;
|
||||||
|
|
||||||
heap = (pairingheap *) palloc(sizeof(pairingheap));
|
heap = (pairingheap *) palloc(sizeof(pairingheap));
|
||||||
pairingheap_initialize(heap, compare, arg);
|
|
||||||
|
|
||||||
return heap;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* pairingheap_initialize
|
|
||||||
*
|
|
||||||
* Same as pairingheap_allocate(), but initializes the pairing heap in-place
|
|
||||||
* rather than allocating a new chunk of memory. Useful to store the pairing
|
|
||||||
* heap in a shared memory.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
|
|
||||||
void *arg)
|
|
||||||
{
|
|
||||||
heap->ph_compare = compare;
|
heap->ph_compare = compare;
|
||||||
heap->ph_arg = arg;
|
heap->ph_arg = arg;
|
||||||
|
|
||||||
heap->ph_root = NULL;
|
heap->ph_root = NULL;
|
||||||
|
|
||||||
|
return heap;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
#include "access/twophase.h"
|
#include "access/twophase.h"
|
||||||
#include "access/xlogprefetcher.h"
|
#include "access/xlogprefetcher.h"
|
||||||
#include "access/xlogrecovery.h"
|
#include "access/xlogrecovery.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "commands/async.h"
|
#include "commands/async.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
@ -149,7 +148,6 @@ CalculateShmemSize(int *num_semaphores)
|
|||||||
size = add_size(size, WaitEventCustomShmemSize());
|
size = add_size(size, WaitEventCustomShmemSize());
|
||||||
size = add_size(size, InjectionPointShmemSize());
|
size = add_size(size, InjectionPointShmemSize());
|
||||||
size = add_size(size, SlotSyncShmemSize());
|
size = add_size(size, SlotSyncShmemSize());
|
||||||
size = add_size(size, WaitLSNShmemSize());
|
|
||||||
|
|
||||||
/* include additional requested shmem from preload libraries */
|
/* include additional requested shmem from preload libraries */
|
||||||
size = add_size(size, total_addin_request);
|
size = add_size(size, total_addin_request);
|
||||||
@ -342,7 +340,6 @@ CreateOrAttachShmemStructs(void)
|
|||||||
StatsShmemInit();
|
StatsShmemInit();
|
||||||
WaitEventCustomShmemInit();
|
WaitEventCustomShmemInit();
|
||||||
InjectionPointShmemInit();
|
InjectionPointShmemInit();
|
||||||
WaitLSNShmemInit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -36,7 +36,6 @@
|
|||||||
#include "access/transam.h"
|
#include "access/transam.h"
|
||||||
#include "access/twophase.h"
|
#include "access/twophase.h"
|
||||||
#include "access/xlogutils.h"
|
#include "access/xlogutils.h"
|
||||||
#include "access/xlogwait.h"
|
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
#include "postmaster/autovacuum.h"
|
#include "postmaster/autovacuum.h"
|
||||||
@ -894,11 +893,6 @@ ProcKill(int code, Datum arg)
|
|||||||
*/
|
*/
|
||||||
LWLockReleaseAll();
|
LWLockReleaseAll();
|
||||||
|
|
||||||
/*
|
|
||||||
* Cleanup waiting for LSN if any.
|
|
||||||
*/
|
|
||||||
WaitLSNCleanup();
|
|
||||||
|
|
||||||
/* Cancel any pending condition variable sleep, too */
|
/* Cancel any pending condition variable sleep, too */
|
||||||
ConditionVariableCancelSleep();
|
ConditionVariableCancelSleep();
|
||||||
|
|
||||||
|
@ -1168,11 +1168,10 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
|
|||||||
MemoryContextSwitchTo(portal->portalContext);
|
MemoryContextSwitchTo(portal->portalContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Some utility commands (e.g., VACUUM, CALL pg_wal_replay_wait()) pop the
|
* Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
|
||||||
* ActiveSnapshot stack from under us, so don't complain if it's now
|
* under us, so don't complain if it's now empty. Otherwise, our snapshot
|
||||||
* empty. Otherwise, our snapshot should be the top one; pop it. Note
|
* should be the top one; pop it. Note that this could be a different
|
||||||
* that this could be a different snapshot from the one we made above; see
|
* snapshot from the one we made above; see EnsurePortalSnapshotExists.
|
||||||
* EnsurePortalSnapshotExists.
|
|
||||||
*/
|
*/
|
||||||
if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
|
if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
|
||||||
{
|
{
|
||||||
|
@ -87,7 +87,6 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
|
|||||||
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
|
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
|
||||||
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
|
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
|
||||||
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
|
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
|
||||||
WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby."
|
|
||||||
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
|
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
|
||||||
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
|
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
|
||||||
|
|
||||||
@ -346,7 +345,6 @@ WALSummarizer "Waiting to read or update WAL summarization state."
|
|||||||
DSMRegistry "Waiting to read or update the dynamic shared memory registry."
|
DSMRegistry "Waiting to read or update the dynamic shared memory registry."
|
||||||
InjectionPoint "Waiting to read or update information related to injection points."
|
InjectionPoint "Waiting to read or update information related to injection points."
|
||||||
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
|
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
|
||||||
WaitLSN "Waiting to read or update shared Wait-for-LSN state."
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
|
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
|
||||||
|
@ -1,89 +0,0 @@
|
|||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* xlogwait.h
|
|
||||||
* Declarations for LSN replay waiting routines.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2024, PostgreSQL Global Development Group
|
|
||||||
*
|
|
||||||
* src/include/access/xlogwait.h
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
#ifndef XLOG_WAIT_H
|
|
||||||
#define XLOG_WAIT_H
|
|
||||||
|
|
||||||
#include "lib/pairingheap.h"
|
|
||||||
#include "postgres.h"
|
|
||||||
#include "port/atomics.h"
|
|
||||||
#include "storage/procnumber.h"
|
|
||||||
#include "storage/spin.h"
|
|
||||||
#include "tcop/dest.h"
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WaitLSNProcInfo - the shared memory structure representing information
|
|
||||||
* about the single process, which may wait for LSN replay. An item of
|
|
||||||
* waitLSN->procInfos array.
|
|
||||||
*/
|
|
||||||
typedef struct WaitLSNProcInfo
|
|
||||||
{
|
|
||||||
/* LSN, which this process is waiting for */
|
|
||||||
XLogRecPtr waitLSN;
|
|
||||||
|
|
||||||
/* Process to wake up once the waitLSN is replayed */
|
|
||||||
ProcNumber procno;
|
|
||||||
|
|
||||||
/* A pairing heap node for participation in waitLSNState->waitersHeap */
|
|
||||||
pairingheap_node phNode;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* A flag indicating that this item is present in
|
|
||||||
* waitLSNState->waitersHeap
|
|
||||||
*/
|
|
||||||
bool inHeap;
|
|
||||||
} WaitLSNProcInfo;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WaitLSNState - the shared memory state for the replay LSN waiting facility.
|
|
||||||
*/
|
|
||||||
typedef struct WaitLSNState
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* The minimum LSN value some process is waiting for. Used for the
|
|
||||||
* fast-path checking if we need to wake up any waiters after replaying a
|
|
||||||
* WAL record. Could be read lock-less. Update protected by WaitLSNLock.
|
|
||||||
*/
|
|
||||||
pg_atomic_uint64 minWaitedLSN;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* A pairing heap of waiting processes order by LSN values (least LSN is
|
|
||||||
* on top). Protected by WaitLSNLock.
|
|
||||||
*/
|
|
||||||
pairingheap waitersHeap;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* An array with per-process information, indexed by the process number.
|
|
||||||
* Protected by WaitLSNLock.
|
|
||||||
*/
|
|
||||||
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
|
|
||||||
} WaitLSNState;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Result statuses for WaitForLSNReplay().
|
|
||||||
*/
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */
|
|
||||||
WAIT_LSN_RESULT_TIMEOUT, /* Timeout occurred */
|
|
||||||
WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our
|
|
||||||
* wait */
|
|
||||||
} WaitLSNResult;
|
|
||||||
|
|
||||||
extern PGDLLIMPORT WaitLSNState *waitLSNState;
|
|
||||||
|
|
||||||
extern Size WaitLSNShmemSize(void);
|
|
||||||
extern void WaitLSNShmemInit(void);
|
|
||||||
extern void WaitLSNWakeup(XLogRecPtr currentLSN);
|
|
||||||
extern void WaitLSNCleanup(void);
|
|
||||||
extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout);
|
|
||||||
|
|
||||||
#endif /* XLOG_WAIT_H */
|
|
@ -57,6 +57,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* yyyymmddN */
|
/* yyyymmddN */
|
||||||
#define CATALOG_VERSION_NO 202411011
|
#define CATALOG_VERSION_NO 202411042
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -6665,17 +6665,6 @@
|
|||||||
prorettype => 'text', proargtypes => '',
|
prorettype => 'text', proargtypes => '',
|
||||||
prosrc => 'pg_get_wal_replay_pause_state' },
|
prosrc => 'pg_get_wal_replay_pause_state' },
|
||||||
|
|
||||||
{ oid => '8593',
|
|
||||||
descr => 'wait for the target LSN to be replayed on standby with an optional timeout',
|
|
||||||
proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void',
|
|
||||||
proargtypes => 'pg_lsn int8 bool', proargnames => '{target_lsn,timeout,no_error}',
|
|
||||||
prosrc => 'pg_wal_replay_wait' },
|
|
||||||
{ oid => '8594',
|
|
||||||
descr => 'the last result for pg_wal_replay_wait()',
|
|
||||||
proname => 'pg_wal_replay_wait_status', prorettype => 'text',
|
|
||||||
proargtypes => '',
|
|
||||||
prosrc => 'pg_wal_replay_wait_status' },
|
|
||||||
|
|
||||||
{ oid => '6224', descr => 'get resource managers loaded in system',
|
{ oid => '6224', descr => 'get resource managers loaded in system',
|
||||||
proname => 'pg_get_wal_resource_managers', prorows => '50', proretset => 't',
|
proname => 'pg_get_wal_resource_managers', prorows => '50', proretset => 't',
|
||||||
provolatile => 'v', prorettype => 'record', proargtypes => '',
|
provolatile => 'v', prorettype => 'record', proargtypes => '',
|
||||||
|
@ -77,9 +77,6 @@ typedef struct pairingheap
|
|||||||
|
|
||||||
extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
|
extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
|
||||||
void *arg);
|
void *arg);
|
||||||
extern void pairingheap_initialize(pairingheap *heap,
|
|
||||||
pairingheap_comparator compare,
|
|
||||||
void *arg);
|
|
||||||
extern void pairingheap_free(pairingheap *heap);
|
extern void pairingheap_free(pairingheap *heap);
|
||||||
extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
|
extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
|
||||||
extern pairingheap_node *pairingheap_first(pairingheap *heap);
|
extern pairingheap_node *pairingheap_first(pairingheap *heap);
|
||||||
|
@ -83,4 +83,3 @@ PG_LWLOCK(49, WALSummarizer)
|
|||||||
PG_LWLOCK(50, DSMRegistry)
|
PG_LWLOCK(50, DSMRegistry)
|
||||||
PG_LWLOCK(51, InjectionPoint)
|
PG_LWLOCK(51, InjectionPoint)
|
||||||
PG_LWLOCK(52, SerialControl)
|
PG_LWLOCK(52, SerialControl)
|
||||||
PG_LWLOCK(53, WaitLSN)
|
|
||||||
|
@ -51,7 +51,6 @@ tests += {
|
|||||||
't/040_standby_failover_slots_sync.pl',
|
't/040_standby_failover_slots_sync.pl',
|
||||||
't/041_checkpoint_at_promote.pl',
|
't/041_checkpoint_at_promote.pl',
|
||||||
't/042_low_level_backup.pl',
|
't/042_low_level_backup.pl',
|
||||||
't/043_wal_replay_wait.pl',
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1,225 +0,0 @@
|
|||||||
# Checks waiting for the lsn replay on standby using
|
|
||||||
# pg_wal_replay_wait() procedure.
|
|
||||||
use strict;
|
|
||||||
use warnings FATAL => 'all';
|
|
||||||
|
|
||||||
use PostgreSQL::Test::Cluster;
|
|
||||||
use PostgreSQL::Test::Utils;
|
|
||||||
use Test::More;
|
|
||||||
|
|
||||||
# Initialize primary node
|
|
||||||
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
|
|
||||||
$node_primary->init(allows_streaming => 1);
|
|
||||||
$node_primary->start;
|
|
||||||
|
|
||||||
# And some content and take a backup
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
|
|
||||||
my $backup_name = 'my_backup';
|
|
||||||
$node_primary->backup($backup_name);
|
|
||||||
|
|
||||||
# Create a streaming standby with a 1 second delay from the backup
|
|
||||||
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
|
|
||||||
my $delay = 1;
|
|
||||||
$node_standby->init_from_backup($node_primary, $backup_name,
|
|
||||||
has_streaming => 1);
|
|
||||||
$node_standby->append_conf(
|
|
||||||
'postgresql.conf', qq[
|
|
||||||
recovery_min_apply_delay = '${delay}s'
|
|
||||||
]);
|
|
||||||
$node_standby->start;
|
|
||||||
|
|
||||||
# 1. Make sure that pg_wal_replay_wait() works: add new content to
|
|
||||||
# primary and memorize primary's insert LSN, then wait for that LSN to be
|
|
||||||
# replayed on standby.
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"INSERT INTO wait_test VALUES (generate_series(11, 20))");
|
|
||||||
my $lsn1 =
|
|
||||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
|
|
||||||
my $output = $node_standby->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CALL pg_wal_replay_wait('${lsn1}', 1000000);
|
|
||||||
SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
|
|
||||||
]);
|
|
||||||
|
|
||||||
# Make sure the current LSN on standby is at least as big as the LSN we
|
|
||||||
# observed on primary's before.
|
|
||||||
ok($output >= 0,
|
|
||||||
"standby reached the same LSN as primary after pg_wal_replay_wait()");
|
|
||||||
|
|
||||||
# 2. Check that new data is visible after calling pg_wal_replay_wait()
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"INSERT INTO wait_test VALUES (generate_series(21, 30))");
|
|
||||||
my $lsn2 =
|
|
||||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
|
|
||||||
$output = $node_standby->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CALL pg_wal_replay_wait('${lsn2}');
|
|
||||||
SELECT count(*) FROM wait_test;
|
|
||||||
]);
|
|
||||||
|
|
||||||
# Make sure the count(*) on standby reflects the recent changes on primary
|
|
||||||
ok($output eq 30, "standby reached the same LSN as primary");
|
|
||||||
|
|
||||||
# 3. Check that waiting for unreachable LSN triggers the timeout. The
|
|
||||||
# unreachable LSN must be well in advance. So WAL records issued by
|
|
||||||
# the concurrent autovacuum could not affect that.
|
|
||||||
my $lsn3 =
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"SELECT pg_current_wal_insert_lsn() + 10000000000");
|
|
||||||
my $stderr;
|
|
||||||
$node_standby->safe_psql('postgres',
|
|
||||||
"CALL pg_wal_replay_wait('${lsn2}', 10);");
|
|
||||||
$node_standby->psql(
|
|
||||||
'postgres',
|
|
||||||
"CALL pg_wal_replay_wait('${lsn3}', 1000);",
|
|
||||||
stderr => \$stderr);
|
|
||||||
ok( $stderr =~ /timed out while waiting for target LSN/,
|
|
||||||
"get timeout on waiting for unreachable LSN");
|
|
||||||
|
|
||||||
$output = $node_standby->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CALL pg_wal_replay_wait('${lsn2}', 10, true);
|
|
||||||
SELECT pg_wal_replay_wait_status();]);
|
|
||||||
ok( $output eq "success",
|
|
||||||
"pg_wal_replay_wait_status() returns correct status after successful waiting"
|
|
||||||
);
|
|
||||||
$output = $node_standby->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CALL pg_wal_replay_wait('${lsn3}', 10, true);
|
|
||||||
SELECT pg_wal_replay_wait_status();]);
|
|
||||||
ok($output eq "timeout",
|
|
||||||
"pg_wal_replay_wait_status() returns correct status after timeout");
|
|
||||||
|
|
||||||
# 4. Check that pg_wal_replay_wait() triggers an error if called on primary,
|
|
||||||
# within another function, or inside a transaction with an isolation level
|
|
||||||
# higher than READ COMMITTED.
|
|
||||||
|
|
||||||
$node_primary->psql(
|
|
||||||
'postgres',
|
|
||||||
"CALL pg_wal_replay_wait('${lsn3}');",
|
|
||||||
stderr => \$stderr);
|
|
||||||
ok( $stderr =~ /recovery is not in progress/,
|
|
||||||
"get an error when running on the primary");
|
|
||||||
|
|
||||||
$node_standby->psql(
|
|
||||||
'postgres',
|
|
||||||
"BEGIN ISOLATION LEVEL REPEATABLE READ; CALL pg_wal_replay_wait('${lsn3}');",
|
|
||||||
stderr => \$stderr);
|
|
||||||
ok( $stderr =~
|
|
||||||
/pg_wal_replay_wait\(\) must be only called without an active or registered snapshot/,
|
|
||||||
"get an error when running in a transaction with an isolation level higher than REPEATABLE READ"
|
|
||||||
);
|
|
||||||
|
|
||||||
$node_primary->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$
|
|
||||||
BEGIN
|
|
||||||
CALL pg_wal_replay_wait(target_lsn);
|
|
||||||
END
|
|
||||||
\$\$
|
|
||||||
LANGUAGE plpgsql;
|
|
||||||
]);
|
|
||||||
|
|
||||||
$node_primary->wait_for_catchup($node_standby);
|
|
||||||
$node_standby->psql(
|
|
||||||
'postgres',
|
|
||||||
"SELECT pg_wal_replay_wait_wrap('${lsn3}');",
|
|
||||||
stderr => \$stderr);
|
|
||||||
ok( $stderr =~
|
|
||||||
/pg_wal_replay_wait\(\) must be only called without an active or registered snapshot/,
|
|
||||||
"get an error when running within another function");
|
|
||||||
|
|
||||||
# 5. Also, check the scenario of multiple LSN waiters. We make 5 background
|
|
||||||
# psql sessions each waiting for a corresponding insertion. When waiting is
|
|
||||||
# finished, stored procedures logs if there are visible as many rows as
|
|
||||||
# should be.
|
|
||||||
$node_primary->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
|
|
||||||
DECLARE
|
|
||||||
count int;
|
|
||||||
BEGIN
|
|
||||||
SELECT count(*) FROM wait_test INTO count;
|
|
||||||
IF count >= 31 + i THEN
|
|
||||||
RAISE LOG 'count %', i;
|
|
||||||
END IF;
|
|
||||||
END
|
|
||||||
\$\$
|
|
||||||
LANGUAGE plpgsql;
|
|
||||||
]);
|
|
||||||
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
|
|
||||||
my @psql_sessions;
|
|
||||||
for (my $i = 0; $i < 5; $i++)
|
|
||||||
{
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"INSERT INTO wait_test VALUES (${i});");
|
|
||||||
my $lsn =
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"SELECT pg_current_wal_insert_lsn()");
|
|
||||||
$psql_sessions[$i] = $node_standby->background_psql('postgres');
|
|
||||||
$psql_sessions[$i]->query_until(
|
|
||||||
qr/start/, qq[
|
|
||||||
\\echo start
|
|
||||||
CALL pg_wal_replay_wait('${lsn}');
|
|
||||||
SELECT log_count(${i});
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
my $log_offset = -s $node_standby->logfile;
|
|
||||||
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
|
|
||||||
for (my $i = 0; $i < 5; $i++)
|
|
||||||
{
|
|
||||||
$node_standby->wait_for_log("count ${i}", $log_offset);
|
|
||||||
$psql_sessions[$i]->quit;
|
|
||||||
}
|
|
||||||
|
|
||||||
ok(1, 'multiple LSN waiters reported consistent data');
|
|
||||||
|
|
||||||
# 6. Check that the standby promotion terminates the wait on LSN. Start
|
|
||||||
# waiting for an unreachable LSN then promote. Check the log for the relevant
|
|
||||||
# error message. Also, check that waiting for already replayed LSN doesn't
|
|
||||||
# cause an error even after promotion.
|
|
||||||
my $lsn4 =
|
|
||||||
$node_primary->safe_psql('postgres',
|
|
||||||
"SELECT pg_current_wal_insert_lsn() + 10000000000");
|
|
||||||
my $lsn5 =
|
|
||||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
|
|
||||||
my $psql_session = $node_standby->background_psql('postgres');
|
|
||||||
$psql_session->query_until(
|
|
||||||
qr/start/, qq[
|
|
||||||
\\echo start
|
|
||||||
CALL pg_wal_replay_wait('${lsn4}');
|
|
||||||
]);
|
|
||||||
|
|
||||||
# Make sure standby will be promoted at least at the primary insert LSN we
|
|
||||||
# have just observed. Use pg_switch_wal() to force the insert LSN to be
|
|
||||||
# written then wait for standby to catchup.
|
|
||||||
$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
|
|
||||||
$node_primary->wait_for_catchup($node_standby);
|
|
||||||
|
|
||||||
$log_offset = -s $node_standby->logfile;
|
|
||||||
$node_standby->promote;
|
|
||||||
$node_standby->wait_for_log('recovery is not in progress', $log_offset);
|
|
||||||
|
|
||||||
ok(1, 'got error after standby promote');
|
|
||||||
|
|
||||||
$node_standby->safe_psql('postgres', "CALL pg_wal_replay_wait('${lsn5}');");
|
|
||||||
|
|
||||||
ok(1, 'wait for already replayed LSN exits immediately even after promotion');
|
|
||||||
|
|
||||||
$output = $node_standby->safe_psql(
|
|
||||||
'postgres', qq[
|
|
||||||
CALL pg_wal_replay_wait('${lsn4}', 10, true);
|
|
||||||
SELECT pg_wal_replay_wait_status();]);
|
|
||||||
ok( $output eq "not in recovery",
|
|
||||||
"pg_wal_replay_wait_status() returns correct status after standby promotion"
|
|
||||||
);
|
|
||||||
|
|
||||||
$node_standby->stop;
|
|
||||||
$node_primary->stop;
|
|
||||||
|
|
||||||
# If we send \q with $psql_session->quit the command can be sent to the session
|
|
||||||
# already closed. So \q is in initial script, here we only finish IPC::Run.
|
|
||||||
$psql_session->{run}->finish;
|
|
||||||
|
|
||||||
done_testing();
|
|
@ -3126,9 +3126,6 @@ WaitEventIO
|
|||||||
WaitEventIPC
|
WaitEventIPC
|
||||||
WaitEventSet
|
WaitEventSet
|
||||||
WaitEventTimeout
|
WaitEventTimeout
|
||||||
WaitLSNProcInfo
|
|
||||||
WaitLSNResult
|
|
||||||
WaitLSNState
|
|
||||||
WaitPMResult
|
WaitPMResult
|
||||||
WalCloseMethod
|
WalCloseMethod
|
||||||
WalCompression
|
WalCompression
|
||||||
|
Reference in New Issue
Block a user