1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Implement waiting for given lsn at transaction start

This commit adds following optional clause to BEGIN and START TRANSACTION
commands.

  WAIT FOR LSN lsn [ TIMEOUT timeout ]

New clause pospones transaction start till given lsn is applied on standby.
This clause allows user be sure, that changes previously made on primary would
be visible on standby.

New shared memory struct is used to track awaited lsn per backend.  Recovery
process wakes up backend once required lsn is applied.

Author: Ivan Kartyshov, Anna Akenteva
Reviewed-by: Craig Ringer, Thomas Munro, Robert Haas, Kyotaro Horiguchi
Reviewed-by: Masahiko Sawada, Ants Aasma, Dmitry Ivanov, Simon Riggs
Reviewed-by: Amit Kapila, Alexander Korotkov
Discussion: https://postgr.es/m/0240c26c-9f84-30ea-fca9-93ab2df5f305%40postgrespro.ru
This commit is contained in:
Alexander Korotkov
2020-04-07 23:51:10 +03:00
parent 357889eb17
commit 0f5ca02f53
20 changed files with 585 additions and 10 deletions

View File

@ -42,6 +42,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "executor/instrument.h"
#include "miscadmin.h"
@ -7154,6 +7155,7 @@ StartupXLOG(void)
do
{
bool switchedTLI = false;
XLogRecPtr minWaitedLSN;
#ifdef WAL_DEBUG
if (XLOG_DEBUG ||
@ -7357,6 +7359,17 @@ StartupXLOG(void)
break;
}
/*
* If we replayed an LSN that someone was waiting for, set
* latches in shared memory array to notify the waiter.
*/
minWaitedLSN = WaitLSNGetMin();
if (!XLogRecPtrIsInvalid(minWaitedLSN) &&
minWaitedLSN <= XLogCtl->lastReplayedEndRecPtr)
{
WaitLSNSetLatch(XLogCtl->lastReplayedEndRecPtr);
}
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);

View File

@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
view.o
view.o \
wait.o
include $(top_srcdir)/src/backend/common.mk

295
src/backend/commands/wait.c Normal file
View File

@ -0,0 +1,295 @@
/*-------------------------------------------------------------------------
*
* wait.c
* Implements WAIT FOR clause for BEGIN and START TRANSACTION commands.
* This clause allows waiting for given LSN to be replayed on standby.
*
* Copyright (c) 2020, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/commands/wait.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "commands/wait.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/backendid.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/timestamp.h"
/*
* Shared memory structure representing information about LSNs, which backends
* are waiting for replay.
*/
typedef struct
{
slock_t mutex; /* mutex protecting the fields below */
int max_backend_id; /* max backend_id present in lsns[] */
pg_atomic_uint64 min_lsn; /* minimal waited LSN */
/* per-backend array of waited LSNs */
XLogRecPtr lsns[FLEXIBLE_ARRAY_MEMBER];
} WaitLSNState;
static WaitLSNState * state;
/*
* Add the wait event of the current backend to shared memory array
*/
static void
WaitLSNAdd(XLogRecPtr lsn_to_wait)
{
SpinLockAcquire(&state->mutex);
if (state->max_backend_id < MyBackendId)
state->max_backend_id = MyBackendId;
state->lsns[MyBackendId] = lsn_to_wait;
if (lsn_to_wait < state->min_lsn.value)
state->min_lsn.value = lsn_to_wait;
SpinLockRelease(&state->mutex);
}
/*
* Delete wait event of the current backend from the shared memory array.
*/
void
WaitLSNDelete(void)
{
int i;
XLogRecPtr deleted_lsn;
SpinLockAcquire(&state->mutex);
deleted_lsn = state->lsns[MyBackendId];
state->lsns[MyBackendId] = InvalidXLogRecPtr;
/* If we are deleting the minimal LSN, then choose the next min_lsn */
if (!XLogRecPtrIsInvalid(deleted_lsn) &&
deleted_lsn == state->min_lsn.value)
{
state->min_lsn.value = InvalidXLogRecPtr;
for (i = 2; i <= state->max_backend_id; i++)
{
if (!XLogRecPtrIsInvalid(state->lsns[i]) &&
(state->lsns[i] < state->min_lsn.value ||
XLogRecPtrIsInvalid(state->min_lsn.value)))
{
state->min_lsn.value = state->lsns[i];
}
}
}
/* If deleting from the end of the array, shorten the array's used part */
if (state->max_backend_id == MyBackendId)
{
for (i = (MyBackendId); i >= 2; i--)
if (!XLogRecPtrIsInvalid(state->lsns[i]))
{
state->max_backend_id = i;
break;
}
}
SpinLockRelease(&state->mutex);
}
/*
* Report amount of shared memory space needed for WaitLSNState
*/
Size
WaitLSNShmemSize(void)
{
Size size;
size = offsetof(WaitLSNState, lsns);
size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
return size;
}
/*
* Initialize an shared memory structure for waiting for LSN
*/
void
WaitLSNShmemInit(void)
{
bool found;
uint32 i;
state = (WaitLSNState *) ShmemInitStruct("pg_wait_lsn",
WaitLSNShmemSize(),
&found);
if (!found)
{
SpinLockInit(&state->mutex);
for (i = 0; i < (MaxBackends + 1); i++)
state->lsns[i] = InvalidXLogRecPtr;
state->max_backend_id = 0;
pg_atomic_init_u64(&state->min_lsn, InvalidXLogRecPtr);
}
}
/*
* Set latches in shared memory to signal that new LSN has been replayed
*/
void
WaitLSNSetLatch(XLogRecPtr cur_lsn)
{
uint32 i;
int max_backend_id;
PGPROC *backend;
SpinLockAcquire(&state->mutex);
max_backend_id = state->max_backend_id;
for (i = 2; i <= max_backend_id; i++)
{
backend = BackendIdGetProc(i);
if (backend && state->lsns[i] != 0 &&
state->lsns[i] <= cur_lsn)
{
SetLatch(&backend->procLatch);
}
}
SpinLockRelease(&state->mutex);
}
/*
* Get minimal LSN that some backend is waiting for
*/
XLogRecPtr
WaitLSNGetMin(void)
{
return state->min_lsn.value;
}
/*
* On WAIT use a latch to wait till LSN is replayed, postmaster dies or timeout
* happens. Timeout is specified in milliseconds. Returns true if LSN was
* reached and false otherwise.
*/
bool
WaitLSNUtility(XLogRecPtr target_lsn, const int timeout_ms)
{
XLogRecPtr cur_lsn;
int latch_events;
float8 endtime;
bool res = false;
bool wait_forever = (timeout_ms <= 0);
endtime = GetNowFloat() + timeout_ms / 1000.0;
latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
/* Check if we already reached the needed LSN */
cur_lsn = GetXLogReplayRecPtr(NULL);
if (cur_lsn >= target_lsn)
return true;
WaitLSNAdd(target_lsn);
ResetLatch(MyLatch);
/* Recheck if LSN was reached while WaitLSNAdd() and ResetLatch() */
cur_lsn = GetXLogReplayRecPtr(NULL);
if (cur_lsn >= target_lsn)
return true;
for (;;)
{
int rc;
float8 time_left = 0;
long time_left_ms = 0;
time_left = endtime - GetNowFloat();
/* Use 1 second as the default timeout to check for interrupts */
if (wait_forever || time_left < 0 || time_left > 1.0)
time_left_ms = 1000;
else
time_left_ms = (long) ceil(time_left * 1000.0);
/* If interrupt, LockErrorCleanup() will do WaitLSNDelete() for us */
CHECK_FOR_INTERRUPTS();
/* If postmaster dies, finish immediately */
if (!PostmasterIsAlive())
break;
rc = WaitLatch(MyLatch, latch_events, time_left_ms,
WAIT_EVENT_CLIENT_READ);
ResetLatch(MyLatch);
if (rc & WL_LATCH_SET)
cur_lsn = GetXLogReplayRecPtr(NULL);
if (rc & WL_TIMEOUT)
{
time_left = endtime - GetNowFloat();
/* If the time specified by user has passed, stop waiting */
if (!wait_forever && time_left <= 0.0)
break;
cur_lsn = GetXLogReplayRecPtr(NULL);
}
/* If LSN has been replayed */
if (target_lsn <= cur_lsn)
break;
}
WaitLSNDelete();
if (cur_lsn < target_lsn)
ereport(WARNING,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("didn't start transaction because LSN was not reached"),
errhint("Try to increase wait timeout.")));
else
res = true;
return res;
}
/*
* Implementation of WAIT FOR clause for BEGIN and START TRANSACTION commands
*/
int
WaitLSNMain(WaitClause *stmt, DestReceiver *dest)
{
TupleDesc tupdesc;
TupOutputState *tstate;
XLogRecPtr target_lsn;
bool res = false;
target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
CStringGetDatum(stmt->lsn)));
res = WaitLSNUtility(target_lsn, stmt->timeout);
/* Need a tuple descriptor representing a single TEXT column */
tupdesc = CreateTemplateTupleDesc(1);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
/* Prepare for projection of tuples */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
/* Send the result */
do_text_output_oneline(tstate, res ? "t" : "f");
end_tup_output(tstate);
return res;
}

View File

@ -3748,10 +3748,22 @@ _copyTransactionStmt(const TransactionStmt *from)
COPY_STRING_FIELD(savepoint_name);
COPY_STRING_FIELD(gid);
COPY_SCALAR_FIELD(chain);
COPY_NODE_FIELD(wait);
return newnode;
}
static WaitClause *
_copyWaitClause(const WaitClause *from)
{
WaitClause *newnode = makeNode(WaitClause);
COPY_STRING_FIELD(lsn);
COPY_SCALAR_FIELD(timeout);
return newnode;
};
static CompositeTypeStmt *
_copyCompositeTypeStmt(const CompositeTypeStmt *from)
{
@ -5339,6 +5351,9 @@ copyObjectImpl(const void *from)
case T_TransactionStmt:
retval = _copyTransactionStmt(from);
break;
case T_WaitClause:
retval = _copyWaitClause(from);
break;
case T_CompositeTypeStmt:
retval = _copyCompositeTypeStmt(from);
break;

View File

@ -1541,6 +1541,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
COMPARE_STRING_FIELD(savepoint_name);
COMPARE_STRING_FIELD(gid);
COMPARE_SCALAR_FIELD(chain);
COMPARE_NODE_FIELD(wait);
return true;
}
static bool
_equalWaitClause(const WaitClause *a, const WaitClause *b)
{
COMPARE_STRING_FIELD(lsn);
COMPARE_SCALAR_FIELD(timeout);
return true;
}
@ -3391,6 +3401,9 @@ equal(const void *a, const void *b)
case T_TransactionStmt:
retval = _equalTransactionStmt(a, b);
break;
case T_WaitClause:
retval = _equalWaitClause(a, b);
break;
case T_CompositeTypeStmt:
retval = _equalCompositeTypeStmt(a, b);
break;

View File

@ -2784,6 +2784,28 @@ _outDefElem(StringInfo str, const DefElem *node)
WRITE_LOCATION_FIELD(location);
}
static void
_outTransactionStmt(StringInfo str, const TransactionStmt *node)
{
WRITE_NODE_TYPE("TRANSACTIONSTMT");
WRITE_STRING_FIELD(savepoint_name);
WRITE_STRING_FIELD(gid);
WRITE_NODE_FIELD(options);
WRITE_BOOL_FIELD(chain);
WRITE_ENUM_FIELD(kind, TransactionStmtKind);
WRITE_NODE_FIELD(wait);
}
static void
_outWaitClause(StringInfo str, const WaitClause *node)
{
WRITE_NODE_TYPE("WAITCLAUSE");
WRITE_STRING_FIELD(lsn);
WRITE_UINT_FIELD(timeout);
}
static void
_outTableLikeClause(StringInfo str, const TableLikeClause *node)
{
@ -4334,6 +4356,12 @@ outNode(StringInfo str, const void *obj)
case T_PartitionRangeDatum:
_outPartitionRangeDatum(str, obj);
break;
case T_TransactionStmt:
_outTransactionStmt(str, obj);
break;
case T_WaitClause:
_outWaitClause(str, obj);
break;
default:

View File

@ -601,6 +601,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
%type <ival> wait_time
%type <node> wait_for
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
@ -670,7 +672,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@ -701,7 +703,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@ -711,7 +713,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
WAIT WHEN WHERE WHITESPACE_P WINDOW
WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@ -9955,18 +9958,20 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *)n;
}
| BEGIN_P opt_transaction transaction_mode_list_or_empty
| BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
n->wait = $4;
$$ = (Node *)n;
}
| START TRANSACTION transaction_mode_list_or_empty
| START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
n->wait = $4;
$$ = (Node *)n;
}
| COMMIT opt_transaction opt_transaction_chain
@ -14240,6 +14245,25 @@ xml_passing_mech:
| BY VALUE_P
;
/*
* WAIT FOR clause of BEGIN and START TRANSACTION statements
*/
wait_for:
WAIT FOR LSN Sconst wait_time
{
WaitClause *n = makeNode(WaitClause);
n->lsn = $4;
n->timeout = $5;
$$ = (Node *)n;
}
| /* EMPTY */ { $$ = NULL; }
;
wait_time:
TIMEOUT Iconst { $$ = $2; }
| /* EMPTY */ { $$ = 0; }
;
/*
* Aggregate decoration clauses
@ -15391,6 +15415,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
| LSN
| MAPPING
| MATCH
| MATERIALIZED
@ -15518,6 +15543,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
| TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@ -15544,6 +15570,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
| WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT

View File

@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, WaitLSNShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
/*
* Init array of Latches in shared memory for WAIT
*/
WaitLSNShmemInit();
#ifdef EXEC_BACKEND
/*

View File

@ -38,6 +38,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@ -717,6 +718,9 @@ LockErrorCleanup(void)
AbortStrongLockAcquire();
/* If BEGIN WAIT FOR LSN was interrupted, then stop waiting for that LSN */
WaitLSNDelete();
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
{

View File

@ -57,6 +57,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@ -591,6 +592,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
WaitClause *waitstmt = (WaitClause *) stmt->wait;
/* WAIT FOR cannot be used on master */
if (stmt->wait && !RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WAIT FOR can only be "
"used on standby")));
/* If needed to WAIT FOR something but failed */
if (stmt->wait && WaitLSNMain(waitstmt, dest) == 0)
break;
BeginTransactionBlock();
foreach(lc, stmt->options)

View File

@ -372,8 +372,6 @@ pg_sleep(PG_FUNCTION_ARGS)
* less than the specified time when WaitLatch is terminated early by a
* non-query-canceling signal such as SIGHUP.
*/
#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
endtime = GetNowFloat() + secs;
for (;;)