1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

Introduce Streaming Replication.

This includes two new kinds of postmaster processes, walsenders and
walreceiver. Walreceiver is responsible for connecting to the primary server
and streaming WAL to disk, while walsender runs in the primary server and
streams WAL from disk to the client.

Documentation still needs work, but the basics are there. We will probably
pull the replication section to a new chapter later on, as well as the
sections describing file-based replication. But let's do that as a separate
patch, so that it's easier to see what has been added/changed. This patch
also adds a new section to the chapter about FE/BE protocol, documenting the
protocol used by walsender/walreceivxer.

Bump catalog version because of two new functions,
pg_last_xlog_receive_location() and pg_last_xlog_replay_location(), for
monitoring the progress of replication.

Fujii Masao, with additional hacking by me
This commit is contained in:
Heikki Linnakangas
2010-01-15 09:19:10 +00:00
parent 4cbe473938
commit 40f908bdcd
53 changed files with 3567 additions and 220 deletions

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.357 2010/01/04 12:50:49 heikki Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.358 2010/01/15 09:19:00 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@ -41,6 +41,8 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@ -141,6 +143,16 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
/*
* Are we doing recovery from XLOG stream? If so, we recover without using
* offline XLOG archives even though InArchiveRecovery==true. This flag is
* used only in standby mode.
*/
static bool InStreamingRecovery = false;
/* The current log page is partially-filled, and so needs to be read again? */
static bool needReread = false;
/*
* Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state".
@ -165,7 +177,7 @@ static bool InArchiveRecovery = false;
/* Was the last xlog file restored from archive, or local? */
static bool restoredFromArchive = false;
/* options taken from recovery.conf */
/* options taken from recovery.conf for archive recovery */
static char *recoveryRestoreCommand = NULL;
static char *recoveryEndCommand = NULL;
static bool recoveryTarget = false;
@ -175,6 +187,11 @@ static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static TimestampTz recoveryLastXTime = 0;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyMode = false;
static char *PrimaryConnInfo = NULL;
char *TriggerFile = NULL;
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
static TransactionId recoveryStopXid;
static TimestampTz recoveryStopTime;
@ -229,6 +246,18 @@ XLogRecPtr XactLastRecEnd = {0, 0};
*/
static XLogRecPtr RedoRecPtr;
/*
* RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby
* mode, XLOG streaming usually starts from the position where an invalid
* record was found. But if we fail to read even the initial checkpoint
* record, we use the REDO location instead of the checkpoint location as
* the start position of XLOG streaming. Otherwise we would have to jump
* backwards to the REDO location after reading the checkpoint record,
* because the REDO record can precede the checkpoint record.
*/
static XLogRecPtr RedoStartLSN = {0, 0};
/*----------
* Shared-memory data structures for XLOG control
*
@ -349,6 +378,7 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
TimeLineID ThisTimeLineID;
TimeLineID RecoveryTargetTLI;
/*
* SharedRecoveryInProgress indicates if we're still in crash or archive
@ -369,6 +399,8 @@ typedef struct XLogCtlData
XLogRecPtr replayEndRecPtr;
/* timestamp of last record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
/* end+1 of the last record replayed */
XLogRecPtr recoveryLastRecPtr;
slock_t info_lck; /* locks shared variables shown above */
} XLogCtlData;
@ -481,12 +513,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
static bool AdvanceXLInsertBuffer(bool new_segment);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
static int XLogFileOpen(uint32 log, uint32 seg);
static int XLogFileRead(uint32 log, uint32 seg, int emode);
static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname,
@ -497,6 +526,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
@ -513,7 +543,6 @@ static char *str_time(pg_time_t tnow);
#ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record);
#endif
static void issue_xlog_fsync(void);
static void pg_start_backup_callback(int code, Datum arg);
static bool read_backup_label(XLogRecPtr *checkPointLoc);
static void rm_redo_error_callback(void *arg);
@ -1690,7 +1719,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
*/
if (finishing_seg || (xlog_switch && last_iteration))
{
issue_xlog_fsync();
issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
@ -1754,7 +1783,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
openLogFile = XLogFileOpen(openLogId, openLogSeg);
openLogOff = 0;
}
issue_xlog_fsync();
issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
}
LogwrtResult.Flush = LogwrtResult.Write;
}
@ -2189,7 +2218,7 @@ XLogNeedsFlush(XLogRecPtr record)
* take down the system on failure). They will promote to PANIC if we are
* in a critical section.
*/
static int
int
XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock)
{
@ -2536,7 +2565,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
/*
* Open a pre-existing logfile segment for writing.
*/
static int
int
XLogFileOpen(uint32 log, uint32 seg)
{
char path[MAXPGPATH];
@ -2586,7 +2615,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
XLogFileName(xlogfname, tli, log, seg);
if (InArchiveRecovery)
if (InArchiveRecovery && !InStreamingRecovery)
{
/* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@ -2641,12 +2670,13 @@ XLogFileClose(void)
/*
* WAL segment files will not be re-read in normal operation, so we advise
* the OS to release any cached pages. But do not do so if WAL archiving
* is active, because archiver process could use the cache to read the WAL
* segment. Also, don't bother with it if we are using O_DIRECT, since
* the kernel is presumably not caching in that case.
* or streaming is active, because archiver and walsender process could use
* the cache to read the WAL segment. Also, don't bother with it if we
* are using O_DIRECT, since the kernel is presumably not caching in that
* case.
*/
#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
if (!XLogArchivingActive() &&
if (!XLogIsNeeded() &&
(get_sync_bit(sync_method) & PG_O_DIRECT) == 0)
(void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
#endif
@ -2689,6 +2719,10 @@ RestoreArchivedFile(char *path, const char *xlogfname,
uint32 restartLog;
uint32 restartSeg;
/* In standby mode, restore_command might not be supplied */
if (StandbyMode && recoveryRestoreCommand == NULL)
goto not_available;
/*
* When doing archive recovery, we always prefer an archived log file even
* if a file of the same name exists in XLOGDIR. The reason is that the
@ -2913,6 +2947,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
(errmsg("could not restore file \"%s\" from archive: return code %d",
xlogfname, rc)));
not_available:
/*
* if an archived file is not available, there might still be a version of
* this file in XLOGDIR, so return that as the filename to open.
@ -3117,7 +3152,18 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
if (XLogArchiveCheckDone(xlde->d_name))
/*
* Normally we don't delete old XLOG files during recovery to
* avoid accidentally deleting a file that looks stale due to a
* bug or hardware issue, but in fact contains important data.
* During streaming recovery, however, we will eventually fill the
* disk if we never clean up, so we have to. That's not an issue
* with file-based archive recovery because in that case we
* restore one XLOG file at a time, on-demand, and with a
* different filename that can't be confused with regular XLOG
* files.
*/
if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
@ -3427,6 +3473,79 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
return true;
}
/*
* Attempt to fetch an XLOG record.
*
* If RecPtr is not NULL, try to fetch a record at that position. Otherwise
* try to fetch a record just after the last one previously read.
*
* In standby mode, if we failed in reading a valid record and are not doing
* recovery from XLOG stream yet, we ignore the failure and start walreceiver
* process to fetch the record from the primary. Otherwise, returns NULL,
* or fails if emode is PANIC. (emode must be either PANIC or LOG.)
*
* If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
* this case, if we have to start XLOG streaming, we use RedoStartLSN as the
* streaming start position instead of RecPtr.
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
{
if (StandbyMode && !InStreamingRecovery)
{
XLogRecord *record;
XLogRecPtr startlsn;
bool haveNextRecord = (nextRecord != NULL);
/* An invalid record is OK here, so we set emode to DEBUG2 */
record = ReadRecord(RecPtr, DEBUG2);
if (record != NULL)
return record;
/*
* Start XLOG streaming if there is no more valid records available
* in the archive.
*
* We need to calculate the start position of XLOG streaming. If we
* read a record in the middle of a segment which doesn't exist in
* pg_xlog, we use the start of the segment as the start position.
* That prevents a broken segment (i.e., with no records in the
* first half of a segment) from being created by XLOG streaming,
* which might cause trouble later on if the segment is e.g
* archived.
*/
startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
if (startlsn.xrecoff % XLogSegSize != 0)
{
char xlogpath[MAXPGPATH];
struct stat stat_buf;
uint32 log;
uint32 seg;
XLByteToSeg(startlsn, log, seg);
XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
if (stat(xlogpath, &stat_buf) != 0)
startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
}
RequestXLogStreaming(startlsn, PrimaryConnInfo);
/* Needs to read the current page again if the next record is in it */
needReread = haveNextRecord;
nextRecord = NULL;
InStreamingRecovery = true;
ereport(LOG,
(errmsg("starting streaming recovery at %X/%X",
startlsn.xlogid, startlsn.xrecoff)));
}
return ReadRecord(RecPtr, emode);
}
/*
* Attempt to read an XLOG record.
*
@ -3434,13 +3553,13 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
* try to read a record just after the last one previously read.
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC or LOG.)
* (emode must be either PANIC, LOG or DEBUG2.)
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, int emode)
ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
{
XLogRecord *record;
char *buffer;
@ -3451,6 +3570,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
uint32 targetPageOff;
uint32 targetRecOff;
uint32 pageHeaderSize;
XLogRecPtr receivedUpto = {0,0};
bool finished;
int emode;
/*
* We don't expect any invalid records during streaming recovery: we
* should never hit the end of WAL because we wait for it to be streamed.
* Therefore treat any broken WAL as PANIC, instead of failing over.
*/
if (InStreamingRecovery)
emode = PANIC;
else
emode = emode_arg;
if (readBuf == NULL)
{
@ -3474,14 +3606,13 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
record = nextRecord;
goto got_record;
}
/* align old recptr to next page */
if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
if (tmpRecPtr.xrecoff >= XLogFileSize)
{
(tmpRecPtr.xlogid)++;
tmpRecPtr.xrecoff = 0;
}
/*
* Align old recptr to next page if the current page is filled and
* doesn't need to be read again.
*/
if (!needReread)
NextLogPage(tmpRecPtr);
/* We will account for page header size below */
}
else
@ -3507,6 +3638,21 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
close(readFile);
readFile = -1;
}
/* Is the target record ready yet? */
if (InStreamingRecovery)
{
receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
if (finished)
{
if (emode_arg == PANIC)
ereport(PANIC,
(errmsg("streaming recovery ended")));
else
return NULL;
}
}
XLByteToSeg(*RecPtr, readId, readSeg);
if (readFile < 0)
{
@ -3539,9 +3685,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
}
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
if (readOff != targetPageOff)
if (readOff != targetPageOff || needReread)
{
readOff = targetPageOff;
needReread = false;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
ereport(emode,
@ -3697,6 +3844,7 @@ got_record:;
{
/* Need to reassemble record */
XLogContRecord *contrecord;
XLogRecPtr nextpagelsn = *RecPtr;
uint32 gotlen = len;
memcpy(buffer, record, len);
@ -3704,6 +3852,23 @@ got_record:;
buffer += len;
for (;;)
{
/* Is the next page ready yet? */
if (InStreamingRecovery)
{
if (gotlen != len)
nextpagelsn.xrecoff += XLOG_BLCKSZ;
NextLogPage(nextpagelsn);
receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
if (finished)
{
if (emode_arg == PANIC)
ereport(PANIC,
(errmsg("streaming recovery ended")));
else
return NULL;
}
}
readOff += XLOG_BLCKSZ;
if (readOff >= XLogSegSize)
{
@ -3768,6 +3933,21 @@ got_record:;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
/*
* Check whether the current page needs to be read again. If there is no
* unread record in the current page (nextRecord == NULL), obviously we
* don't need to reread it. If we're not in streaming recovery mode yet,
* partially-filled page doesn't need to be reread because it is the
* last valid page.
*/
if (nextRecord != NULL && InStreamingRecovery &&
XLByteLE(receivedUpto, EndRecPtr))
{
nextRecord = NULL;
needReread = true;
}
ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
@ -3781,6 +3961,21 @@ got_record:;
nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
/*
* Check whether the current page needs to be read again. If there is no
* unread record in the current page (nextRecord == NULL), obviously we
* don't need to reread it. If we're not in streaming recovery mode yet,
* partially-filled page doesn't need to be reread because it is the last
* valid page.
*/
if (nextRecord != NULL && InStreamingRecovery &&
XLByteLE(receivedUpto, EndRecPtr))
{
nextRecord = NULL;
needReread = true;
}
ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len);
@ -3793,6 +3988,7 @@ got_record:;
EndRecPtr.xrecoff += XLogSegSize - 1;
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
nextRecord = NULL; /* definitely not on same page */
needReread = false;
/*
* Pretend that readBuf contains the last page of the segment. This is
@ -4586,6 +4782,16 @@ UpdateControlFile(void)
errmsg("could not close control file: %m")));
}
/*
* Returns the unique system identifier from control file.
*/
uint64
GetSystemIdentifier(void)
{
Assert(ControlFile != NULL);
return ControlFile->system_identifier;
}
/*
* Initialization of shared memory for XLOG
*/
@ -4822,7 +5028,7 @@ str_time(pg_time_t tnow)
/*
* See if there is a recovery command file (recovery.conf), and if so
* read in parameters for archive recovery.
* read in parameters for archive recovery and XLOG streaming.
*
* XXX longer term intention is to expand this to
* cater for additional parameters and controls
@ -4974,6 +5180,29 @@ readRecoveryCommandFile(void)
ereport(LOG,
(errmsg("recovery_target_inclusive = %s", tok2)));
}
else if (strcmp(tok1, "standby_mode") == 0)
{
if (!parse_bool(tok2, &StandbyMode))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("parameter \"standby_mode\" requires a Boolean value")));
ereport(LOG,
(errmsg("standby_mode = '%s'", tok2)));
}
else if (strcmp(tok1, "primary_conninfo") == 0)
{
PrimaryConnInfo = pstrdup(tok2);
ereport(LOG,
(errmsg("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
else if (strcmp(tok1, "trigger_file") == 0)
{
TriggerFile = pstrdup(tok2);
ereport(LOG,
(errmsg("trigger_file = '%s'",
TriggerFile)));
}
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
@ -4988,10 +5217,10 @@ readRecoveryCommandFile(void)
cmdline),
errhint("Lines should have the format parameter = 'value'.")));
/* Check that required parameters were supplied */
if (recoveryRestoreCommand == NULL)
/* If not in standby mode, restore_command must be supplied */
if (!StandbyMode && recoveryRestoreCommand == NULL)
ereport(FATAL,
(errmsg("recovery command file \"%s\" did not specify restore_command",
(errmsg("recovery command file \"%s\" did not specify restore_command nor standby_mode",
RECOVERY_COMMAND_FILE)));
/* Enable fetching from archive recovery area */
@ -5452,6 +5681,9 @@ StartupXLOG(void)
recoveryTargetTLI,
ControlFile->checkPointCopy.ThisTimeLineID)));
/* Save the selected recovery target timeline ID in shared memory */
XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
if (read_backup_label(&checkPointLoc))
{
/*
@ -5482,6 +5714,7 @@ StartupXLOG(void)
* to pg_control is broken, try the next-to-last one.
*/
checkPointLoc = ControlFile->checkPoint;
RedoStartLSN = ControlFile->checkPointCopy.redo;
record = ReadCheckpointRecord(checkPointLoc, 1);
if (record != NULL)
{
@ -5489,6 +5722,15 @@ StartupXLOG(void)
(errmsg("checkpoint record is at %X/%X",
checkPointLoc.xlogid, checkPointLoc.xrecoff)));
}
else if (InStreamingRecovery)
{
/*
* The last valid checkpoint record required for a streaming
* recovery exists in neither standby nor the primary.
*/
ereport(PANIC,
(errmsg("could not locate a valid checkpoint record")));
}
else
{
checkPointLoc = ControlFile->prevCheckPoint;
@ -5688,12 +5930,12 @@ StartupXLOG(void)
if (XLByteLT(checkPoint.redo, RecPtr))
{
/* back up to find the record */
record = ReadRecord(&(checkPoint.redo), PANIC);
record = FetchRecord(&(checkPoint.redo), PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
record = ReadRecord(NULL, LOG);
record = FetchRecord(NULL, LOG, false);
}
if (record != NULL)
@ -5706,9 +5948,10 @@ StartupXLOG(void)
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
/* initialize shared replayEndRecPtr */
/* initialize shared replayEndRecPtr and recoveryLastRecPtr */
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck);
InRedo = true;
@ -5762,14 +6005,8 @@ StartupXLOG(void)
}
#endif
/*
* Check if we were requested to re-read config file.
*/
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/* Handle interrupt signals of startup process */
HandleStartupProcInterrupts();
/*
* Have we passed our safe starting point?
@ -5841,9 +6078,17 @@ StartupXLOG(void)
/* Pop the error context stack */
error_context_stack = errcontext.previous;
/*
* Update shared recoveryLastRecPtr after this record has been
* replayed.
*/
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->recoveryLastRecPtr = EndRecPtr;
SpinLockRelease(&xlogctl->info_lck);
LastRec = ReadRecPtr;
record = ReadRecord(NULL, LOG);
record = FetchRecord(NULL, LOG, false);
} while (record != NULL && recoveryContinue);
/*
@ -5867,6 +6112,27 @@ StartupXLOG(void)
}
}
/*
* If we launched a WAL receiver, it should be gone by now. It will trump
* over the startup checkpoint and subsequent records if it's still alive,
* so be extra sure that it's gone.
*/
if (WalRcvInProgress())
elog(PANIC, "wal receiver still active");
/*
* We are now done reading the xlog from stream. Turn off streaming
* recovery, and restart fetching the files (which would be required
* at end of recovery, e.g., timeline history file) from archive.
*/
if (InStreamingRecovery)
{
/* We are no longer in streaming recovery state */
InStreamingRecovery = false;
ereport(LOG,
(errmsg("streaming recovery complete")));
}
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
@ -6241,7 +6507,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
return NULL;
}
record = ReadRecord(&RecPtr, LOG);
record = FetchRecord(&RecPtr, LOG, true);
if (record == NULL)
{
@ -6387,6 +6653,26 @@ GetInsertRecPtr(void)
return recptr;
}
/*
* GetWriteRecPtr -- Returns the current write position.
*
* NOTE: The value returned lags behind the real write position. But,
* an approximation is enough for the current usage of this function.
*/
XLogRecPtr
GetWriteRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->LogwrtResult.Write;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
/*
* Get the time of the last xlog segment switch
*/
@ -6443,6 +6729,16 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
*epoch = ckptXidEpoch;
}
/*
* GetRecoveryTargetTLI - get the recovery target timeline ID
*/
TimeLineID
GetRecoveryTargetTLI(void)
{
/* RecoveryTargetTLI doesn't change so we need no lock to copy it */
return XLogCtl->RecoveryTargetTLI;
}
/*
* This must be called ONCE during postmaster or standalone-backend shutdown
*/
@ -6917,8 +7213,34 @@ CreateCheckPoint(int flags)
smgrpostckpt();
/*
* Delete old log files (those no longer needed even for previous
* checkpoint).
* If there's connected standby servers doing XLOG streaming, don't
* delete XLOG files that have not been streamed to all of them yet.
* This does nothing to prevent them from being deleted when the
* standby is disconnected (e.g because of network problems), but at
* least it avoids an open replication connection from failing because
* of that.
*/
if ((_logId || _logSeg) && MaxWalSenders > 0)
{
XLogRecPtr oldest;
uint32 log;
uint32 seg;
oldest = GetOldestWALSendPointer();
if (oldest.xlogid != 0 || oldest.xrecoff != 0)
{
XLByteToSeg(oldest, log, seg);
if (log < _logId || (log == _logId && seg < _logSeg))
{
_logId = log;
_logSeg = seg;
}
}
}
/*
* Delete old log files (those no longer needed even for
* previous checkpoint or the standbys in XLOG streaming).
*/
if (_logId || _logSeg)
{
@ -7036,6 +7358,8 @@ CreateRestartPoint(int flags)
{
XLogRecPtr lastCheckPointRecPtr;
CheckPoint lastCheckPoint;
uint32 _logId;
uint32 _logSeg;
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@ -7105,6 +7429,12 @@ CreateRestartPoint(int flags)
CheckPointGuts(lastCheckPoint.redo, flags);
/*
* Select point at which we can truncate the xlog, which we base on the
* prior checkpoint's earliest info.
*/
XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
/*
* Update pg_control, using current time. Check that it still shows
* IN_ARCHIVE_RECOVERY state and an older checkpoint, else do nothing;
@ -7123,6 +7453,34 @@ CreateRestartPoint(int flags)
}
LWLockRelease(ControlFileLock);
/* Are we doing recovery from XLOG stream? */
if (!InStreamingRecovery)
InStreamingRecovery = WalRcvInProgress();
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
* growing full. We don't need do this during normal recovery, but during
* streaming recovery we have to or the disk will eventually fill up from
* old log files streamed from master.
*/
if (InStreamingRecovery && (_logId || _logSeg))
{
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
endptr = GetWalRcvWriteRecPtr();
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
/*
* Make more log segments if needed. (Do this after recycling old log
* segments, since that may supply some of the needed files.)
*/
PreallocXlogFiles(endptr);
}
/*
* Currently, there is no need to truncate pg_subtrans during recovery. If
* we did do that, we will need to have called StartupSUBTRANS() already
@ -7495,36 +7853,39 @@ assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
/*
* Issue appropriate kind of fsync (if any) on the current XLOG output file
* Issue appropriate kind of fsync (if any) for an XLOG output file.
*
* 'fd' is a file descriptor for the XLOG file to be fsync'd.
* 'log' and 'seg' are for error reporting purposes.
*/
static void
issue_xlog_fsync(void)
void
issue_xlog_fsync(int fd, uint32 log, uint32 seg)
{
switch (sync_method)
{
case SYNC_METHOD_FSYNC:
if (pg_fsync_no_writethrough(openLogFile) != 0)
if (pg_fsync_no_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync log file %u, segment %u: %m",
openLogId, openLogSeg)));
log, seg)));
break;
#ifdef HAVE_FSYNC_WRITETHROUGH
case SYNC_METHOD_FSYNC_WRITETHROUGH:
if (pg_fsync_writethrough(openLogFile) != 0)
if (pg_fsync_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync write-through log file %u, segment %u: %m",
openLogId, openLogSeg)));
log, seg)));
break;
#endif
#ifdef HAVE_FDATASYNC
case SYNC_METHOD_FDATASYNC:
if (pg_fdatasync(openLogFile) != 0)
if (pg_fdatasync(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fdatasync log file %u, segment %u: %m",
openLogId, openLogSeg)));
log, seg)));
break;
#endif
case SYNC_METHOD_OPEN:
@ -8020,6 +8381,48 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(cstring_to_text(location));
}
/*
* Report the last WAL receive location (same format as pg_start_backup etc)
*
* This is useful for determining how much of WAL is guaranteed to be received
* and synced to disk by walreceiver.
*/
Datum
pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;
char location[MAXFNAMELEN];
recptr = GetWalRcvWriteRecPtr();
snprintf(location, sizeof(location), "%X/%X",
recptr.xlogid, recptr.xrecoff);
PG_RETURN_TEXT_P(cstring_to_text(location));
}
/*
* Report the last WAL replay location (same format as pg_start_backup etc)
*
* This is useful for determining how much of WAL is visible to read-only
* connections during recovery.
*/
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
SpinLockRelease(&xlogctl->info_lck);
snprintf(location, sizeof(location), "%X/%X",
recptr.xlogid, recptr.xrecoff);
PG_RETURN_TEXT_P(cstring_to_text(location));
}
/*
* Compute an xlog file name and decimal byte offset given a WAL location,
* such as is returned by pg_stop_backup() or pg_xlog_switch().
@ -8143,12 +8546,12 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
* point, we will fail to restore a consistent database state.
*
* Returns TRUE if a backup_label was found (and fills the checkpoint
* location into *checkPointLoc); returns FALSE if not.
* location and its REDO location into *checkPointLoc and RedoStartLSN,
* respectively); returns FALSE if not.
*/
static bool
read_backup_label(XLogRecPtr *checkPointLoc)
{
XLogRecPtr startpoint;
char startxlogfilename[MAXFNAMELEN];
TimeLineID tli;
FILE *lfp;
@ -8174,7 +8577,7 @@ read_backup_label(XLogRecPtr *checkPointLoc)
* format).
*/
if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
&startpoint.xlogid, &startpoint.xrecoff, &tli,
&RedoStartLSN.xlogid, &RedoStartLSN.xrecoff, &tli,
startxlogfilename, &ch) != 5 || ch != '\n')
ereport(FATAL,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@ -8319,6 +8722,25 @@ StartupProcShutdownHandler(SIGNAL_ARGS)
shutdown_requested = true;
}
/* Handle SIGHUP and SIGTERM signals of startup process */
void
HandleStartupProcInterrupts(void)
{
/*
* Check if we were requested to re-read config file.
*/
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Check if we were requested to exit without finishing recovery.
*/
if (shutdown_requested)
proc_exit(1);
}
/* Main entry point for startup process */
void
StartupProcessMain(void)