1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-16 15:02:33 +03:00

Allow read only connections during recovery, known as Hot Standby.

Enabled by recovery_connections = on (default) and forcing archive recovery using a recovery.conf. Recovery processing now emulates the original transactions as they are replayed, providing full locking and MVCC behaviour for read only queries. Recovery must enter consistent state before connections are allowed, so there is a delay, typically short, before connections succeed. Replay of recovering transactions can conflict and in some cases deadlock with queries during recovery; these result in query cancellation after max_standby_delay seconds have expired. Infrastructure changes have minor effects on normal running, though introduce four new types of WAL record.

New test mode "make standbycheck" allows regression tests of static command behaviour on a standby server while in recovery. Typical and extreme dynamic behaviours have been checked via code inspection and manual testing. Few port specific behaviours have been utilised, though primary testing has been on Linux only so far.

This commit is the basic patch. Additional changes will follow in this release to enhance some aspects of behaviour, notably improved handling of conflicts, deadlock detection and query cancellation. Changes to VACUUM FULL are also required.

Simon Riggs, with significant and lengthy review by Heikki Linnakangas, including streamlined redesign of snapshot creation and two-phase commit.

Important contributions from Florian Pflug, Mark Kirkwood, Merlin Moncure, Greg Stark, Gianni Ciolli, Gabriele Bartolini, Hannu Krosing, Robert Haas, Tatsuo Ishii, Hiroyuki Yamada plus support and feedback from many other community members.
This commit is contained in:
Simon Riggs
2009-12-19 01:32:45 +00:00
parent 78a09145e0
commit efc16ea520
87 changed files with 6165 additions and 428 deletions

View File

@@ -1,4 +1,4 @@
$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.12 2008/10/20 19:18:18 alvherre Exp $
$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.13 2009/12/19 01:32:33 sriggs Exp $
The Transaction System
======================
@@ -649,3 +649,34 @@ fsync it down to disk without any sort of interlock, as soon as it finishes
the bulk update. However, all these paths are designed to write data that
no other transaction can see until after T1 commits. The situation is thus
not different from ordinary WAL-logged updates.
Transaction Emulation during Recovery
-------------------------------------
During Recovery we replay transaction changes in the order they occurred.
As part of this replay we emulate some transactional behaviour, so that
read only backends can take MVCC snapshots. We do this by maintaining a
list of XIDs belonging to transactions that are being replayed, so that
each transaction that has recorded WAL records for database writes exist
in the array until it commits. Further details are given in comments in
procarray.c.
Many actions write no WAL records at all, for example read only transactions.
These have no effect on MVCC in recovery and we can pretend they never
occurred at all. Subtransaction commit does not write a WAL record either
and has very little effect, since lock waiters need to wait for the
parent transaction to complete.
Not all transactional behaviour is emulated, for example we do not insert
a transaction entry into the lock table, nor do we maintain the transaction
stack in memory. Clog entries are made normally. Multitrans is not maintained
because its purpose is to record tuple level locks that an application has
requested to prevent write locks. Since write locks cannot be obtained at all,
there is never any conflict and so there is no reason to update multitrans.
Subtrans is maintained during recovery but the details of the transaction
tree are ignored and all subtransactions reference the top-level TransactionId
directly. Since commit is atomic this provides correct lock wait behaviour
yet simplifies emulation of subtransactions considerably.
Further details on locking mechanics in recovery are given in comments
with the Lock rmgr code.

View File

@@ -26,7 +26,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.53 2009/06/11 14:48:54 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.54 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -574,7 +574,7 @@ ExtendCLOG(TransactionId newestXact)
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
ZeroCLOGPage(pageno, !InRecovery);
LWLockRelease(CLogControlLock);
}

View File

@@ -42,7 +42,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/multixact.c,v 1.32 2009/11/23 09:58:36 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/multixact.c,v 1.33 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -59,6 +59,7 @@
#include "storage/backendid.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -220,7 +221,6 @@ static MultiXactId GetNewMultiXactId(int nxids, MultiXactOffset *offset);
static MultiXactId mXactCacheGetBySet(int nxids, TransactionId *xids);
static int mXactCacheGetById(MultiXactId multi, TransactionId **xids);
static void mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids);
static int xidComparator(const void *arg1, const void *arg2);
#ifdef MULTIXACT_DEBUG
static char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids);
@@ -1221,27 +1221,6 @@ mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids)
MXactCache = entry;
}
/*
* xidComparator
* qsort comparison function for XIDs
*
* We don't need to use wraparound comparison for XIDs, and indeed must
* not do so since that does not respect the triangle inequality! Any
* old sort order will do.
*/
static int
xidComparator(const void *arg1, const void *arg2)
{
TransactionId xid1 = *(const TransactionId *) arg1;
TransactionId xid2 = *(const TransactionId *) arg2;
if (xid1 > xid2)
return 1;
if (xid1 < xid2)
return -1;
return 0;
}
#ifdef MULTIXACT_DEBUG
static char *
mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids)
@@ -2051,11 +2030,18 @@ multixact_redo(XLogRecPtr lsn, XLogRecord *record)
if (TransactionIdPrecedes(max_xid, xids[i]))
max_xid = xids[i];
}
/* We don't expect anyone else to modify nextXid, hence startup process
* doesn't need to hold a lock while checking this. We still acquire
* the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
}
else

View File

@@ -79,3 +79,10 @@
#
#
#---------------------------------------------------------------------------
# HOT STANDBY PARAMETERS
#---------------------------------------------------------------------------
#
# If you want to enable read-only connections during recovery, enable
# recovery_connections in postgresql.conf
#
#---------------------------------------------------------------------------

View File

@@ -3,7 +3,7 @@
*
* Resource managers definition
*
* $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.27 2008/11/19 10:34:50 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.28 2009/12/19 01:32:33 sriggs Exp $
*/
#include "postgres.h"
@@ -21,6 +21,7 @@
#include "commands/sequence.h"
#include "commands/tablespace.h"
#include "storage/freespace.h"
#include "storage/standby.h"
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
@@ -32,7 +33,7 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
{"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
{"Reserved 7", NULL, NULL, NULL, NULL, NULL},
{"Reserved 8", NULL, NULL, NULL, NULL, NULL},
{"Standby", standby_redo, standby_desc, NULL, NULL, NULL},
{"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
{"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},

View File

@@ -22,7 +22,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/subtrans.c,v 1.24 2009/01/01 17:23:36 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/subtrans.c,v 1.25 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -68,15 +68,19 @@ static bool SubTransPagePrecedes(int page1, int page2);
/*
* Record the parent of a subtransaction in the subtrans log.
*
* In some cases we may need to overwrite an existing value.
*/
void
SubTransSetParent(TransactionId xid, TransactionId parent)
SubTransSetParent(TransactionId xid, TransactionId parent, bool overwriteOK)
{
int pageno = TransactionIdToPage(xid);
int entryno = TransactionIdToEntry(xid);
int slotno;
TransactionId *ptr;
Assert(TransactionIdIsValid(parent));
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
@@ -84,7 +88,8 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
ptr += entryno;
/* Current state should be 0 */
Assert(*ptr == InvalidTransactionId);
Assert(*ptr == InvalidTransactionId ||
(*ptr == parent && overwriteOK));
*ptr = parent;

View File

@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.56 2009/11/23 09:58:36 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.57 2009/12/19 01:32:33 sriggs Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
@@ -57,6 +57,7 @@
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -144,7 +145,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileNode *rels);
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
@@ -736,10 +740,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
* 2. TransactionId[] (subtransactions)
* 3. RelFileNode[] (files to be deleted at commit)
* 4. RelFileNode[] (files to be deleted at abort)
* 5. TwoPhaseRecordOnDisk
* 6. ...
* 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
* 8. CRC32
* 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
* 6. TwoPhaseRecordOnDisk
* 7. ...
* 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
* 9. CRC32
*
* Each segment except the final CRC32 is MAXALIGN'd.
*/
@@ -760,6 +765,8 @@ typedef struct TwoPhaseFileHeader
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
char gid[GIDSIZE]; /* GID for transaction */
} TwoPhaseFileHeader;
@@ -835,6 +842,7 @@ StartPrepare(GlobalTransaction gxact)
TransactionId *children;
RelFileNode *commitrels;
RelFileNode *abortrels;
SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
records.head = palloc0(sizeof(XLogRecData));
@@ -859,11 +867,16 @@ StartPrepare(GlobalTransaction gxact)
hdr.nsubxacts = xactGetCommittedChildren(&children);
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
&hdr.initfileinval);
StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
/* Add the additional info about subxacts and deletable files */
/*
* Add the additional info about subxacts, deletable files and
* cache invalidation messages.
*/
if (hdr.nsubxacts > 0)
{
save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
@@ -880,6 +893,12 @@ StartPrepare(GlobalTransaction gxact)
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
if (hdr.ninvalmsgs > 0)
{
save_state_data(invalmsgs,
hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
pfree(invalmsgs);
}
}
/*
@@ -1071,7 +1090,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* contents of the file. Otherwise return NULL.
*/
static char *
ReadTwoPhaseFile(TransactionId xid)
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
{
char path[MAXPGPATH];
char *buf;
@@ -1087,10 +1106,11 @@ ReadTwoPhaseFile(TransactionId xid)
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not open two-phase state file \"%s\": %m",
path)));
if (give_warnings)
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not open two-phase state file \"%s\": %m",
path)));
return NULL;
}
@@ -1103,10 +1123,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (fstat(fd, &stat))
{
close(fd);
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not stat two-phase state file \"%s\": %m",
path)));
if (give_warnings)
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not stat two-phase state file \"%s\": %m",
path)));
return NULL;
}
@@ -1134,10 +1155,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (read(fd, buf, stat.st_size) != stat.st_size)
{
close(fd);
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not read two-phase state file \"%s\": %m",
path)));
if (give_warnings)
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not read two-phase state file \"%s\": %m",
path)));
pfree(buf);
return NULL;
}
@@ -1166,6 +1188,30 @@ ReadTwoPhaseFile(TransactionId xid)
return buf;
}
/*
* Confirms an xid is prepared, during recovery
*/
bool
StandbyTransactionIdIsPrepared(TransactionId xid)
{
char *buf;
TwoPhaseFileHeader *hdr;
bool result;
Assert(TransactionIdIsValid(xid));
/* Read and validate file */
buf = ReadTwoPhaseFile(xid, false);
if (buf == NULL)
return false;
/* Check header also */
hdr = (TwoPhaseFileHeader *) buf;
result = TransactionIdEquals(hdr->xid, xid);
pfree(buf);
return result;
}
/*
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
@@ -1184,6 +1230,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
SharedInvalidationMessage *invalmsgs;
int i;
/*
@@ -1196,7 +1243,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* Read and validate the state file
*/
buf = ReadTwoPhaseFile(xid);
buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -1215,6 +1262,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
invalmsgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/* compute latestXid among all children */
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1230,7 +1279,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
if (isCommit)
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels);
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
@@ -1277,6 +1328,18 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
smgrclose(srel);
}
/*
* Handle cache invalidation messages.
*
* Relcache init file invalidation requires processing both
* before and after we send the SI messages. See AtEOXact_Inval()
*/
if (hdr->initfileinval)
RelationCacheInitFileInvalidate(true);
SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
if (hdr->initfileinval)
RelationCacheInitFileInvalidate(false);
/* And now do the callbacks */
if (isCommit)
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
@@ -1528,14 +1591,21 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Our other responsibility is to determine and return the oldest valid XID
* among the prepared xacts (if none, return ShmemVariableCache->nextXid).
* This is needed to synchronize pg_subtrans startup properly.
*
* If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
* top-level xids is stored in *xids_p. The number of entries in the array
* is returned in *nxids_p.
*/
TransactionId
PrescanPreparedTransactions(void)
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
DIR *cldir;
struct dirent *clde;
TransactionId *xids = NULL;
int nxids = 0;
int allocsize = 0;
cldir = AllocateDir(TWOPHASE_DIR);
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -1567,7 +1637,7 @@ PrescanPreparedTransactions(void)
*/
/* Read and validate file */
buf = ReadTwoPhaseFile(xid);
buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1615,11 +1685,36 @@ PrescanPreparedTransactions(void)
}
}
if (xids_p)
{
if (nxids == allocsize)
{
if (nxids == 0)
{
allocsize = 10;
xids = palloc(allocsize * sizeof(TransactionId));
}
else
{
allocsize = allocsize * 2;
xids = repalloc(xids, allocsize * sizeof(TransactionId));
}
}
xids[nxids++] = xid;
}
pfree(buf);
}
}
FreeDir(cldir);
if (xids_p)
{
*xids_p = xids;
*nxids_p = nxids;
}
return result;
}
@@ -1636,6 +1731,7 @@ RecoverPreparedTransactions(void)
char dir[MAXPGPATH];
DIR *cldir;
struct dirent *clde;
bool overwriteOK = false;
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
@@ -1666,7 +1762,7 @@ RecoverPreparedTransactions(void)
}
/* Read and validate file */
buf = ReadTwoPhaseFile(xid);
buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1687,6 +1783,15 @@ RecoverPreparedTransactions(void)
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/*
* It's possible that SubTransSetParent has been set before, if the
* prepared transaction generated xid assignment records. Test
* here must match one used in AssignTransactionId().
*/
if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
overwriteOK = true;
/*
* Reconstruct subtrans state for the transaction --- needed
@@ -1696,7 +1801,7 @@ RecoverPreparedTransactions(void)
* hierarchy, but there's no need to restore that exactly.
*/
for (i = 0; i < hdr->nsubxacts; i++)
SubTransSetParent(subxids[i], xid);
SubTransSetParent(subxids[i], xid, overwriteOK);
/*
* Recreate its GXACT and dummy PGPROC
@@ -1719,6 +1824,14 @@ RecoverPreparedTransactions(void)
*/
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
/*
* Release locks held by the standby process after we process each
* prepared transaction. As a result, we don't need too many
* additional locks at any one time.
*/
if (InHotStandby)
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
pfree(buf);
}
}
@@ -1739,9 +1852,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileNode *rels)
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
XLogRecData rdata[3];
XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -1754,8 +1870,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Emit the XLOG commit record */
xlrec.xid = xid;
xlrec.crec.xact_time = GetCurrentTimestamp();
xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
xlrec.crec.nmsgs = 0;
xlrec.crec.nrels = nrels;
xlrec.crec.nsubxacts = nchildren;
xlrec.crec.nmsgs = ninvalmsgs;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommitPrepared;
rdata[0].buffer = InvalidBuffer;
@@ -1777,6 +1897,15 @@ RecordTransactionCommitPrepared(TransactionId xid,
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
/* dump cache invalidation messages */
if (ninvalmsgs > 0)
{
rdata[lastrdata].next = &(rdata[3]);
rdata[3].data = (char *) invalmsgs;
rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
rdata[lastrdata].next = NULL;
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.10 2009/11/23 09:58:36 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.11 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -19,14 +19,12 @@
#include "commands/async.h"
#include "pgstat.h"
#include "storage/lock.h"
#include "utils/inval.h"
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_recover, /* Lock */
NULL, /* Inval */
NULL, /* notify/listen */
NULL, /* pgstat */
multixact_twophase_recover /* MultiXact */
@@ -36,7 +34,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postcommit, /* Lock */
inval_twophase_postcommit, /* Inval */
notify_twophase_postcommit, /* notify/listen */
pgstat_twophase_postcommit, /* pgstat */
multixact_twophase_postcommit /* MultiXact */
@@ -46,8 +43,16 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postabort, /* Lock */
NULL, /* Inval */
NULL, /* notify/listen */
pgstat_twophase_postabort, /* pgstat */
multixact_twophase_postabort /* MultiXact */
};
const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_standby_recover, /* Lock */
NULL, /* notify/listen */
NULL, /* pgstat */
NULL /* MultiXact */
};

View File

@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.277 2009/12/09 21:57:50 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.278 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -42,6 +42,7 @@
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
#include "storage/standby.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
@@ -139,6 +140,7 @@ typedef struct TransactionStateData
Oid prevUser; /* previous CurrentUserId setting */
int prevSecContext; /* previous SecurityRestrictionContext */
bool prevXactReadOnly; /* entry-time xact r/o state */
bool startedInRecovery; /* did we start in recovery? */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -167,9 +169,17 @@ static TransactionStateData TopTransactionStateData = {
InvalidOid, /* previous CurrentUserId setting */
0, /* previous SecurityRestrictionContext */
false, /* entry-time xact r/o state */
false, /* startedInRecovery */
NULL /* link to parent state block */
};
/*
* unreportedXids holds XIDs of all subtransactions that have not yet been
* reported in a XLOG_XACT_ASSIGNMENT record.
*/
static int nUnreportedXids;
static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
static TransactionState CurrentTransactionState = &TopTransactionStateData;
/*
@@ -392,6 +402,9 @@ AssignTransactionId(TransactionState s)
bool isSubXact = (s->parent != NULL);
ResourceOwner currentOwner;
if (RecoveryInProgress())
elog(ERROR, "cannot assign TransactionIds during recovery");
/* Assert that caller didn't screw up */
Assert(!TransactionIdIsValid(s->transactionId));
Assert(s->state == TRANS_INPROGRESS);
@@ -414,7 +427,7 @@ AssignTransactionId(TransactionState s)
s->transactionId = GetNewTransactionId(isSubXact);
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId);
SubTransSetParent(s->transactionId, s->parent->transactionId, false);
/*
* Acquire lock on the transaction XID. (We assume this cannot block.) We
@@ -435,8 +448,57 @@ AssignTransactionId(TransactionState s)
}
PG_END_TRY();
CurrentResourceOwner = currentOwner;
}
/*
* Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
* top-level transaction we issue a WAL record for the assignment. We
* include the top-level xid and all the subxids that have not yet been
* reported using XLOG_XACT_ASSIGNMENT records.
*
* This is required to limit the amount of shared memory required in a
* hot standby server to keep track of in-progress XIDs. See notes for
* RecordKnownAssignedTransactionIds().
*
* We don't keep track of the immediate parent of each subxid,
* only the top-level transaction that each subxact belongs to. This
* is correct in recovery only because aborted subtransactions are
* separately WAL logged.
*/
if (isSubXact && XLogStandbyInfoActive())
{
unreportedXids[nUnreportedXids] = s->transactionId;
nUnreportedXids++;
/* ensure this test matches similar one in RecoverPreparedTransactions() */
if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
{
XLogRecData rdata[2];
xl_xact_assignment xlrec;
/*
* xtop is always set by now because we recurse up transaction
* stack to the highest unassigned xid and then come back down
*/
xlrec.xtop = GetTopTransactionId();
Assert(TransactionIdIsValid(xlrec.xtop));
xlrec.nsubxacts = nUnreportedXids;
rdata[0].data = (char *) &xlrec;
rdata[0].len = MinSizeOfXactAssignment;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &rdata[1];
rdata[1].data = (char *) unreportedXids;
rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
nUnreportedXids = 0;
}
}
}
/*
* GetCurrentSubTransactionId
@@ -596,6 +658,18 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
return false;
}
/*
* TransactionStartedDuringRecovery
*
* Returns true if the current transaction started while recovery was still
* in progress. Recovery might have ended since so RecoveryInProgress() might
* return false already.
*/
bool
TransactionStartedDuringRecovery(void)
{
return CurrentTransactionState->startedInRecovery;
}
/*
* CommandCounterIncrement
@@ -811,7 +885,7 @@ AtSubStart_ResourceOwner(void)
* This is exported only to support an ugly hack in VACUUM FULL.
*/
TransactionId
RecordTransactionCommit(void)
RecordTransactionCommit(bool isVacuumFull)
{
TransactionId xid = GetTopTransactionIdIfAny();
bool markXidCommitted = TransactionIdIsValid(xid);
@@ -821,11 +895,15 @@ RecordTransactionCommit(void)
bool haveNonTemp;
int nchildren;
TransactionId *children;
int nmsgs;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval;
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels, &haveNonTemp);
nchildren = xactGetCommittedChildren(&children);
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
/*
* If we haven't been assigned an XID yet, we neither can, nor do we want
* to write a COMMIT record.
@@ -859,13 +937,24 @@ RecordTransactionCommit(void)
/*
* Begin commit critical section and insert the commit XLOG record.
*/
XLogRecData rdata[3];
XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit xlrec;
/* Tell bufmgr and smgr to prepare for commit */
BufmgrCommit();
/*
* Set flags required for recovery processing of commits.
*/
xlrec.xinfo = 0;
if (RelcacheInitFileInval)
xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
if (isVacuumFull)
xlrec.xinfo |= XACT_COMPLETION_VACUUM_FULL;
if (forceSyncCommit)
xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
/*
* Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated
@@ -890,6 +979,7 @@ RecordTransactionCommit(void)
xlrec.xact_time = xactStopTimestamp;
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
@@ -911,6 +1001,15 @@ RecordTransactionCommit(void)
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
/* dump shared cache invalidation messages */
if (nmsgs > 0)
{
rdata[lastrdata].next = &(rdata[3]);
rdata[3].data = (char *) invalMessages;
rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1352,6 +1451,13 @@ AtSubAbort_childXids(void)
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
/*
* We could prune the unreportedXids array here. But we don't bother.
* That would potentially reduce number of XLOG_XACT_ASSIGNMENT records
* but it would likely introduce more CPU time into the more common
* paths, so we choose not to do that.
*/
}
/* ----------------------------------------------------------------
@@ -1461,9 +1567,23 @@ StartTransaction(void)
/*
* Make sure we've reset xact state variables
*
* If recovery is still in progress, mark this transaction as read-only.
* We have lower level defences in XLogInsert and elsewhere to stop us
* from modifying data during recovery, but this gives the normal
* indication to the user that the transaction is read-only.
*/
if (RecoveryInProgress())
{
s->startedInRecovery = true;
XactReadOnly = true;
}
else
{
s->startedInRecovery = false;
XactReadOnly = DefaultXactReadOnly;
}
XactIsoLevel = DefaultXactIsoLevel;
XactReadOnly = DefaultXactReadOnly;
forceSyncCommit = false;
MyXactAccessedTempRel = false;
@@ -1475,6 +1595,11 @@ StartTransaction(void)
currentCommandId = FirstCommandId;
currentCommandIdUsed = false;
/*
* initialize reported xid accounting
*/
nUnreportedXids = 0;
/*
* must initialize resource-management stuff first
*/
@@ -1619,7 +1744,7 @@ CommitTransaction(void)
/*
* Here is where we really truly commit.
*/
latestXid = RecordTransactionCommit();
latestXid = RecordTransactionCommit(false);
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
@@ -1853,7 +1978,6 @@ PrepareTransaction(void)
StartPrepare(gxact);
AtPrepare_Notify();
AtPrepare_Inval();
AtPrepare_Locks();
AtPrepare_PgStat();
AtPrepare_MultiXact();
@@ -4199,29 +4323,108 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
/*
* Before 8.5 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
{
TransactionId *sub_xids;
SharedInvalidationMessage *inval_msgs;
TransactionId max_xid;
int i;
/* Mark the transaction committed in pg_clog */
/* subxid array follows relfilenodes */
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
/* Make sure nextXid is beyond any XID mentioned in the record */
max_xid = xid;
for (i = 0; i < xlrec->nsubxacts; i++)
{
if (TransactionIdPrecedes(max_xid, sub_xids[i]))
max_xid = sub_xids[i];
}
max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*
* We don't expect anyone else to modify nextXid, hence we
* don't need to hold a lock while checking this. We still acquire
* the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
if (!InHotStandby || XactCompletionVacuumFull(xlrec))
{
/*
* Mark the transaction committed in pg_clog.
*
* If InHotStandby and this is the first commit of a VACUUM FULL INPLACE
* we perform only the actual commit to clog. Strangely, there are two
* commits that share the same xid for every VFI, so we need to skip
* some steps for the first commit. It's OK to repeat the clog update
* when we see the second commit on a VFI.
*/
TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
}
else
{
/*
* If a transaction completion record arrives that has as-yet unobserved
* subtransactions then this will not have been fully handled by the call
* to RecordKnownAssignedTransactionIds() in the main recovery loop in
* xlog.c. So we need to do bookkeeping again to cover that case. This is
* confusing and it is easy to think this call is irrelevant, which has
* happened three times in development already. Leave it in.
*/
RecordKnownAssignedTransactionIds(max_xid);
/*
* Mark the transaction committed in pg_clog. We use async commit
* protocol during recovery to provide information on database
* consistency for when users try to set hint bits. It is important
* that we do not set hint bits until the minRecoveryPoint is past
* this commit record. This ensures that if we crash we don't see
* hint bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks
* as occurs in .
*/
if (xlrec->nmsgs > 0)
{
/*
* Relcache init file invalidation requires processing both
* before and after we send the SI messages. See AtEOXact_Inval()
*/
if (XactCompletionRelcacheInitFileInval(xlrec))
RelationCacheInitFileInvalidate(true);
SendSharedInvalidMessages(inval_msgs, xlrec->nmsgs);
if (XactCompletionRelcacheInitFileInval(xlrec))
RelationCacheInitFileInvalidate(false);
}
/*
* Release locks, if any. We do this for both two phase and normal
* one phase transactions. In effect we are ignoring the prepare
* phase and just going straight to lock release.
*/
StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
@@ -4240,8 +4443,31 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
}
smgrclose(srel);
}
/*
* We issue an XLogFlush() for the same reason we emit ForceSyncCommit() in
* normal operation. For example, in DROP DATABASE, we delete all the files
* belonging to the database, and then commit the transaction. If we crash
* after all the files have been deleted but before the commit, you have an
* entry in pg_database without any files. To minimize the window for that,
* we use ForceSyncCommit() to rush the commit record to disk as quick as
* possible. We have the same window during recovery, and forcing an
* XLogFlush() (which updates minRecoveryPoint during recovery) helps
* to reduce that problem window, for any user that requested ForceSyncCommit().
*/
if (XactCompletionForceSyncCommit(xlrec))
XLogFlush(lsn);
}
/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
*
* Note also that an abort can be for a subtransaction and its children,
* not just for a top level abort. That means we have to consider
* topxid != xid, whereas in commit we would find topxid == xid always
* because subtransaction commit is never WAL logged.
*/
static void
xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
{
@@ -4249,22 +4475,55 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
TransactionId max_xid;
int i;
/* Mark the transaction aborted in pg_clog */
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
/* Make sure nextXid is beyond any XID mentioned in the record */
max_xid = xid;
for (i = 0; i < xlrec->nsubxacts; i++)
{
if (TransactionIdPrecedes(max_xid, sub_xids[i]))
max_xid = sub_xids[i];
}
/* We don't expect anyone else to modify nextXid, hence we
* don't need to hold a lock while checking this. We still acquire
* the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = max_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
if (InHotStandby)
{
/*
* If a transaction completion record arrives that has as-yet unobserved
* subtransactions then this will not have been fully handled by the call
* to RecordKnownAssignedTransactionIds() in the main recovery loop in
* xlog.c. So we need to do bookkeeping again to cover that case. This is
* confusing and it is easy to think this call is irrelevant, which has
* happened three times in development already. Leave it in.
*/
RecordKnownAssignedTransactionIds(max_xid);
}
/* Mark the transaction aborted in pg_clog, no need for async stuff */
TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
if (InHotStandby)
{
/*
* We must mark clog before we update the ProcArray.
*/
ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
/*
* There are no flat files that need updating, nor invalidation
* messages to send or undo.
*/
/*
* Release locks, if any. There are no invalidations to send.
*/
StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
@@ -4297,7 +4556,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xact_redo_commit(xlrec, record->xl_xid);
xact_redo_commit(xlrec, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
@@ -4315,7 +4574,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
xact_redo_commit(&xlrec->crec, xlrec->xid);
xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
@@ -4325,6 +4584,14 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
xact_redo_abort(&xlrec->arec, xlrec->xid);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
if (InHotStandby)
ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub);
}
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
@@ -4333,6 +4600,14 @@ static void
xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
int i;
TransactionId *xacts;
SharedInvalidationMessage *msgs;
xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec))
appendStringInfo(buf, "; relcache init file inval");
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
@@ -4348,13 +4623,25 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
}
if (xlrec->nsubxacts > 0)
{
TransactionId *xacts = (TransactionId *)
&xlrec->xnodes[xlrec->nrels];
appendStringInfo(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xacts[i]);
}
if (xlrec->nmsgs > 0)
{
appendStringInfo(buf, "; inval msgs:");
for (i = 0; i < xlrec->nmsgs; i++)
{
SharedInvalidationMessage *msg = &msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, "catcache id%d ", msg->id);
else if (msg->id == SHAREDINVALRELCACHE_ID)
appendStringInfo(buf, "relcache ");
else if (msg->id == SHAREDINVALSMGR_ID)
appendStringInfo(buf, "smgr ");
}
}
}
static void
@@ -4385,6 +4672,17 @@ xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
}
}
static void
xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
{
int i;
appendStringInfo(buf, "subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xlrec->xsub[i]);
}
void
xact_desc(StringInfo buf, uint8 xl_info, char *rec)
{
@@ -4412,16 +4710,28 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
appendStringInfo(buf, "commit %u: ", xlrec->xid);
appendStringInfo(buf, "commit prepared %u: ", xlrec->xid);
xact_desc_commit(buf, &xlrec->crec);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
appendStringInfo(buf, "abort %u: ", xlrec->xid);
appendStringInfo(buf, "abort prepared %u: ", xlrec->xid);
xact_desc_abort(buf, &xlrec->arec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
/*
* Note that we ignore the WAL record's xid, since we're more
* interested in the top-level xid that issued the record
* and which xids are being reported here.
*/
appendStringInfo(buf, "xid assignment xtop %u: ", xlrec->xtop);
xact_desc_assignment(buf, xlrec);
}
else
appendStringInfo(buf, "UNKNOWN");
}

View File

@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.353 2009/09/13 18:32:07 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.354 2009/12/19 01:32:33 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -67,6 +67,8 @@ int XLOGbuffers = 8;
int XLogArchiveTimeout = 0;
bool XLogArchiveMode = false;
char *XLogArchiveCommand = NULL;
bool XLogRequestRecoveryConnections = true;
int MaxStandbyDelay = 30;
bool fullPageWrites = true;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
@@ -129,10 +131,16 @@ TimeLineID ThisTimeLineID = 0;
* recovery mode". It should be examined primarily by functions that need
* to act differently when called from a WAL redo function (e.g., to skip WAL
* logging). To check whether the system is in recovery regardless of which
* process you're running in, use RecoveryInProgress().
* process you're running in, use RecoveryInProgress() but only after shared
* memory startup and lock initialization.
*/
bool InRecovery = false;
/* Are we in Hot Standby mode? Only valid in startup process, see xlog.h */
HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
/*
* Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state".
@@ -359,6 +367,8 @@ typedef struct XLogCtlData
/* end+1 of the last record replayed (or being replayed) */
XLogRecPtr replayEndRecPtr;
/* timestamp of last record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
slock_t info_lck; /* locks shared variables shown above */
} XLogCtlData;
@@ -463,6 +473,7 @@ static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI,
uint32 endLogId, uint32 endLogSeg);
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void CheckRequiredParameterValues(CheckPoint checkPoint);
static void LocalSetXLogInsertAllowed(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
@@ -2103,9 +2114,40 @@ XLogAsyncCommitFlush(void)
bool
XLogNeedsFlush(XLogRecPtr record)
{
/* XLOG doesn't need flushing during recovery */
/*
* During recovery, we don't flush WAL but update minRecoveryPoint
* instead. So "needs flush" is taken to mean whether minRecoveryPoint
* would need to be updated.
*/
if (RecoveryInProgress())
return false;
{
/* Quick exit if already known updated */
if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
return false;
/*
* Update local copy of minRecoveryPoint. But if the lock is busy,
* just return a conservative guess.
*/
if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
return true;
minRecoveryPoint = ControlFile->minRecoveryPoint;
LWLockRelease(ControlFileLock);
/*
* An invalid minRecoveryPoint means that we need to recover all the WAL,
* i.e., we're doing crash recovery. We never modify the control file's
* value in that case, so we can short-circuit future checks here too.
*/
if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
updateMinRecoveryPoint = false;
/* check again */
if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
return false;
else
return true;
}
/* Quick exit if already known flushed */
if (XLByteLE(record, LogwrtResult.Flush))
@@ -3259,10 +3301,11 @@ CleanupBackupHistory(void)
* ignoring them as already applied, but that's not a huge drawback.
*
* If 'cleanup' is true, a cleanup lock is used when restoring blocks.
* Otherwise, a normal exclusive lock is used. At the moment, that's just
* pro forma, because there can't be any regular backends in the system
* during recovery. The 'cleanup' argument applies to all backup blocks
* in the WAL record, that suffices for now.
* Otherwise, a normal exclusive lock is used. During crash recovery, that's
* just pro forma because there can't be any regular backends in the system,
* but in hot standby mode the distinction is important. The 'cleanup'
* argument applies to all backup blocks in the WAL record, that suffices for
* now.
*/
void
RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
@@ -4679,6 +4722,7 @@ BootStrapXLOG(void)
checkPoint.oldestXid = FirstNormalTransactionId;
checkPoint.oldestXidDB = TemplateDbOid;
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5117,22 +5161,43 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
TimestampTz recordXtime;
/* We only consider stopping at COMMIT or ABORT records */
if (record->xl_rmid != RM_XACT_ID)
return false;
record_info = record->xl_info & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT)
if (record->xl_rmid == RM_XACT_ID)
{
xl_xact_commit *recordXactCommitData;
record_info = record->xl_info & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT)
{
xl_xact_commit *recordXactCommitData;
recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
}
else if (record_info == XLOG_XACT_ABORT)
{
xl_xact_abort *recordXactAbortData;
recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
recordXtime = recordXactAbortData->xact_time;
}
else
return false;
}
else if (record_info == XLOG_XACT_ABORT)
else if (record->xl_rmid == RM_XLOG_ID)
{
xl_xact_abort *recordXactAbortData;
record_info = record->xl_info & ~XLR_INFO_MASK;
if (record_info == XLOG_CHECKPOINT_SHUTDOWN ||
record_info == XLOG_CHECKPOINT_ONLINE)
{
CheckPoint checkPoint;
recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
recordXtime = recordXactAbortData->xact_time;
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
recoveryLastXTime = checkPoint.time;
}
/*
* We don't want to stop recovery on a checkpoint record, but we do
* want to update recoveryLastXTime. So return is unconditional.
*/
return false;
}
else
return false;
@@ -5216,6 +5281,67 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
return stopsHere;
}
/*
* Returns bool with current recovery mode, a global state.
*/
Datum
pg_is_in_recovery(PG_FUNCTION_ARGS)
{
PG_RETURN_BOOL(RecoveryInProgress());
}
/*
* Returns timestamp of last recovered commit/abort record.
*/
TimestampTz
GetLatestXLogTime(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
recoveryLastXTime = xlogctl->recoveryLastXTime;
SpinLockRelease(&xlogctl->info_lck);
return recoveryLastXTime;
}
/*
* Note that text field supplied is a parameter name and does not require translation
*/
#define RecoveryRequiresIntParameter(param_name, currValue, checkpointValue) \
{ \
if (currValue < checkpointValue) \
ereport(ERROR, \
(errmsg("recovery connections cannot continue because " \
"%s = %u is a lower setting than on WAL source server (value was %u)", \
param_name, \
currValue, \
checkpointValue))); \
}
/*
* Check to see if required parameters are set high enough on this server
* for various aspects of recovery operation.
*/
static void
CheckRequiredParameterValues(CheckPoint checkPoint)
{
/* We ignore autovacuum_max_workers when we make this test. */
RecoveryRequiresIntParameter("max_connections",
MaxConnections, checkPoint.MaxConnections);
RecoveryRequiresIntParameter("max_prepared_xacts",
max_prepared_xacts, checkPoint.max_prepared_xacts);
RecoveryRequiresIntParameter("max_locks_per_xact",
max_locks_per_xact, checkPoint.max_locks_per_xact);
if (!checkPoint.XLogStandbyInfoMode)
ereport(ERROR,
(errmsg("recovery connections cannot start because the recovery_connections "
"parameter is disabled on the WAL source server")));
}
/*
* This must be called ONCE during postmaster or standalone-backend startup
*/
@@ -5228,7 +5354,6 @@ StartupXLOG(void)
bool reachedStopPoint = false;
bool haveBackupLabel = false;
XLogRecPtr RecPtr,
LastRec,
checkPointLoc,
backupStopLoc,
EndOfLog;
@@ -5238,6 +5363,7 @@ StartupXLOG(void)
uint32 freespace;
TransactionId oldestActiveXID;
bool bgwriterLaunched = false;
bool backendsAllowed = false;
/*
* Read control file and check XLOG status looks valid.
@@ -5506,6 +5632,38 @@ StartupXLOG(void)
BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
}
/*
* Initialize recovery connections, if enabled. We won't let backends
* in yet, not until we've reached the min recovery point specified
* in control file and we've established a recovery snapshot from a
* running-xacts WAL record.
*/
if (InArchiveRecovery && XLogRequestRecoveryConnections)
{
TransactionId *xids;
int nxids;
CheckRequiredParameterValues(checkPoint);
ereport(LOG,
(errmsg("initializing recovery connections")));
InitRecoveryTransactionEnvironment();
if (wasShutdown)
oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
else
oldestActiveXID = checkPoint.oldestActiveXid;
Assert(TransactionIdIsValid(oldestActiveXID));
/* Startup commit log and related stuff */
StartupCLOG();
StartupSUBTRANS(oldestActiveXID);
StartupMultiXact();
ProcArrayInitRecoveryInfo(oldestActiveXID);
}
/* Initialize resource managers */
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
@@ -5580,7 +5738,9 @@ StartupXLOG(void)
do
{
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
if (XLOG_DEBUG ||
(rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
(rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))
{
StringInfoData buf;
@@ -5608,27 +5768,29 @@ StartupXLOG(void)
}
/*
* Check if we were requested to exit without finishing
* recovery.
*/
if (shutdown_requested)
proc_exit(1);
/*
* Have we passed our safe starting point? If so, we can tell
* postmaster that the database is consistent now.
* Have we passed our safe starting point?
*/
if (!reachedMinRecoveryPoint &&
XLByteLT(minRecoveryPoint, EndRecPtr))
XLByteLE(minRecoveryPoint, EndRecPtr))
{
reachedMinRecoveryPoint = true;
if (InArchiveRecovery)
{
ereport(LOG,
(errmsg("consistent recovery state reached")));
if (IsUnderPostmaster)
SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
}
ereport(LOG,
(errmsg("consistent recovery state reached at %X/%X",
EndRecPtr.xlogid, EndRecPtr.xrecoff)));
}
/*
* Have we got a valid starting snapshot that will allow
* queries to be run? If so, we can tell postmaster that
* the database is consistent now, enabling connections.
*/
if (standbyState == STANDBY_SNAPSHOT_READY &&
!backendsAllowed &&
reachedMinRecoveryPoint &&
IsUnderPostmaster)
{
backendsAllowed = true;
SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
}
/*
@@ -5662,8 +5824,13 @@ StartupXLOG(void)
*/
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->replayEndRecPtr = EndRecPtr;
xlogctl->recoveryLastXTime = recoveryLastXTime;
SpinLockRelease(&xlogctl->info_lck);
/* In Hot Standby mode, keep track of XIDs we've seen */
if (InHotStandby && TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
/* Pop the error context stack */
@@ -5810,7 +5977,7 @@ StartupXLOG(void)
}
/* Pre-scan prepared transactions to find out the range of XIDs present */
oldestActiveXID = PrescanPreparedTransactions();
oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
if (InRecovery)
{
@@ -5891,14 +6058,27 @@ StartupXLOG(void)
ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
/* Start up the commit log and related stuff, too */
StartupCLOG();
StartupSUBTRANS(oldestActiveXID);
StartupMultiXact();
/*
* Start up the commit log and related stuff, too. In hot standby mode
* we did this already before WAL replay.
*/
if (standbyState == STANDBY_DISABLED)
{
StartupCLOG();
StartupSUBTRANS(oldestActiveXID);
StartupMultiXact();
}
/* Reload shared-memory state for prepared transactions */
RecoverPreparedTransactions();
/*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions(), see notes for lock_twophase_recover()
*/
if (standbyState != STANDBY_DISABLED)
ShutdownRecoveryTransactionEnvironment();
/* Shut down readFile facility, free space */
if (readFile >= 0)
{
@@ -5964,8 +6144,9 @@ RecoveryInProgress(void)
/*
* Initialize TimeLineID and RedoRecPtr when we discover that recovery
* is finished. (If you change this, see also
* LocalSetXLogInsertAllowed.)
* is finished. InitPostgres() relies upon this behaviour to ensure
* that InitXLOGAccess() is called at backend startup. (If you change
* this, see also LocalSetXLogInsertAllowed.)
*/
if (!LocalRecoveryInProgress)
InitXLOGAccess();
@@ -6151,7 +6332,7 @@ InitXLOGAccess(void)
{
/* ThisTimeLineID doesn't change so we need no lock to copy it */
ThisTimeLineID = XLogCtl->ThisTimeLineID;
Assert(ThisTimeLineID != 0);
Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
@@ -6449,6 +6630,12 @@ CreateCheckPoint(int flags)
MemSet(&checkPoint, 0, sizeof(checkPoint));
checkPoint.time = (pg_time_t) time(NULL);
/* Set important parameter values for use when replaying WAL */
checkPoint.MaxConnections = MaxConnections;
checkPoint.max_prepared_xacts = max_prepared_xacts;
checkPoint.max_locks_per_xact = max_locks_per_xact;
checkPoint.XLogStandbyInfoMode = XLogStandbyInfoActive();
/*
* We must hold WALInsertLock while examining insert state to determine
* the checkpoint REDO pointer.
@@ -6624,6 +6811,21 @@ CreateCheckPoint(int flags)
CheckPointGuts(checkPoint.redo, flags);
/*
* Take a snapshot of running transactions and write this to WAL.
* This allows us to reconstruct the state of running transactions
* during archive recovery, if required. Skip, if this info disabled.
*
* If we are shutting down, or Startup process is completing crash
* recovery we don't need to write running xact data.
*
* Update checkPoint.nextXid since we have a later value
*/
if (!shutdown && XLogStandbyInfoActive())
LogStandbySnapshot(&checkPoint.oldestActiveXid, &checkPoint.nextXid);
else
checkPoint.oldestActiveXid = InvalidTransactionId;
START_CRIT_SECTION();
/*
@@ -6791,7 +6993,7 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
if (!(RmgrTable[rmid].rm_safe_restartpoint()))
{
elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
elog(trace_recovery(DEBUG2), "RM %d not safe to record restart point at %X/%X",
rmid,
checkPoint->redo.xlogid,
checkPoint->redo.xrecoff);
@@ -6923,14 +7125,9 @@ CreateRestartPoint(int flags)
LogCheckpointEnd(true);
ereport((log_checkpoints ? LOG : DEBUG2),
(errmsg("recovery restart point at %X/%X",
lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
/* XXX this is currently BROKEN because we are in the wrong process */
if (recoveryLastXTime)
ereport((log_checkpoints ? LOG : DEBUG2),
(errmsg("last completed transaction was at log time %s",
timestamptz_to_str(recoveryLastXTime))));
(errmsg("recovery restart point at %X/%X with latest known log time %s",
lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff,
timestamptz_to_str(GetLatestXLogTime()))));
LWLockRelease(CheckpointLock);
return true;
@@ -7036,6 +7233,19 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->oldestXid = checkPoint.oldestXid;
ShmemVariableCache->oldestXidDB = checkPoint.oldestXidDB;
/* Check to see if any changes to max_connections give problems */
if (standbyState != STANDBY_DISABLED)
CheckRequiredParameterValues(checkPoint);
if (standbyState >= STANDBY_INITIALIZED)
{
/*
* Remove stale transactions, if any.
*/
ExpireOldKnownAssignedTransactionIds(checkPoint.nextXid);
StandbyReleaseOldLocks(checkPoint.nextXid);
}
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
@@ -7114,7 +7324,7 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "checkpoint: redo %X/%X; "
"tli %u; xid %u/%u; oid %u; multi %u; offset %u; "
"oldest xid %u in DB %u; %s",
"oldest xid %u in DB %u; oldest running xid %u; %s",
checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
checkpoint->ThisTimeLineID,
checkpoint->nextXidEpoch, checkpoint->nextXid,
@@ -7123,6 +7333,7 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
checkpoint->nextMultiOffset,
checkpoint->oldestXid,
checkpoint->oldestXidDB,
checkpoint->oldestActiveXid,
(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
}
else if (info == XLOG_NOOP)
@@ -7155,6 +7366,9 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
record->xl_prev.xlogid, record->xl_prev.xrecoff,
record->xl_xid);
appendStringInfo(buf, "; len %u",
record->xl_len);
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (record->xl_info & XLR_SET_BKP_BLOCK(i))
@@ -7311,6 +7525,12 @@ pg_start_backup(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to run a backup")));
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
if (!XLogArchivingActive())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -7498,6 +7718,12 @@ pg_stop_backup(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to run a backup"))));
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
if (!XLogArchivingActive())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -7659,6 +7885,12 @@ pg_switch_xlog(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to switch transaction log files"))));
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
switchpoint = RequestXLogSwitch();
/*
@@ -7681,6 +7913,12 @@ pg_current_xlog_location(PG_FUNCTION_ARGS)
{
char location[MAXFNAMELEN];
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
/* Make sure we have an up-to-date local LogwrtResult */
{
/* use volatile pointer to prevent code rearrangement */
@@ -7708,6 +7946,12 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
XLogRecPtr current_recptr;
char location[MAXFNAMELEN];
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
/*
* Get the current end-of-WAL position ... shared lock is sufficient
*/