diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b75dab5e10f..063355c46a2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -406,7 +406,6 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ TimeLineID ThisTimeLineID; - TimeLineID RecoveryTargetTLI; /* * archiveCleanupCommand is read from recovery.conf but needs to be in @@ -455,14 +454,14 @@ typedef struct XLogCtlData XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* current effective recovery target timeline */ + TimeLineID RecoveryTargetTLI; /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery */ TimestampTz currentChunkStartTime; - /* end of the last record restored from the archive */ - XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ bool recoveryPause; @@ -2880,19 +2879,6 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, if (reload) WalSndRqstFileReload(); - /* - * Calculate the end location of the restored WAL file and save it in - * shmem. It's used as current standby flush position, and cascading - * walsenders try to send WAL records up to this location. - */ - endptr.xlogid = log; - endptr.xrecoff = seg * XLogSegSize; - XLByteAdvance(endptr, XLogSegSize); - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->restoreLastRecPtr = endptr; - SpinLockRelease(&xlogctl->info_lck); - /* Signal walsender that new WAL has arrived */ if (AllowCascadeReplication()) WalSndWakeup(); @@ -4467,12 +4453,17 @@ rescanLatestTimeLine(void) ThisTimeLineID))); else { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + /* Switch target */ recoveryTargetTLI = newtarget; list_free(expectedTLIs); expectedTLIs = newExpectedTLIs; - XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); ereport(LOG, (errmsg("new target timeline is %u", @@ -7519,13 +7510,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) } /* - * GetRecoveryTargetTLI - get the recovery target timeline ID + * GetRecoveryTargetTLI - get the current recovery target timeline ID */ TimeLineID GetRecoveryTargetTLI(void) { - /* RecoveryTargetTLI doesn't change so we need no lock to copy it */ - return XLogCtl->RecoveryTargetTLI; + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + TimeLineID result; + + SpinLockAcquire(&xlogctl->info_lck); + result = xlogctl->RecoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); + + return result; } /* @@ -8321,7 +8319,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetStandbyFlushRecPtr(); + endptr = GetStandbyFlushRecPtr(NULL); KeepLogSeg(endptr, &_logId, &_logSeg); PrevLogSeg(_logId, _logSeg); @@ -9837,14 +9835,13 @@ do_pg_abort_backup(void) /* * Get latest redo apply position. * - * Optionally, returns the end byte position of the last restored - * WAL segment. Callers not interested in that value may pass - * NULL for restoreLastRecPtr. + * Optionally, returns the current recovery target timeline. Callers not + * interested in that may pass NULL for targetTLI. * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) +GetXLogReplayRecPtr(TimeLineID *targetTLI) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -9852,8 +9849,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->recoveryLastRecPtr; - if (restoreLastRecPtr) - *restoreLastRecPtr = xlogctl->restoreLastRecPtr; + if (targetTLI) + *targetTLI = xlogctl->RecoveryTargetTLI; SpinLockRelease(&xlogctl->info_lck); return recptr; @@ -9862,21 +9859,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) /* * Get current standby flush position, ie, the last WAL position * known to be fsync'd to disk in standby. + * + * If 'targetTLI' is not NULL, it's set to the current recovery target + * timeline. */ XLogRecPtr -GetStandbyFlushRecPtr(void) +GetStandbyFlushRecPtr(TimeLineID *targetTLI) { XLogRecPtr receivePtr; XLogRecPtr replayPtr; - XLogRecPtr restorePtr; receivePtr = GetWalRcvWriteRecPtr(NULL); - replayPtr = GetXLogReplayRecPtr(&restorePtr); + replayPtr = GetXLogReplayRecPtr(targetTLI); if (XLByteLT(receivePtr, replayPtr)) - return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; + return replayPtr; else - return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; + return receivePtr; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3f060b82c09..064ddd54953 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -299,7 +299,7 @@ IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); + logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); @@ -1144,7 +1144,31 @@ XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ - SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); + if (am_cascading_walsender) + { + TimeLineID currentTargetTLI; + SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI); + + /* + * If the recovery target timeline changed, bail out. It's a bit + * unfortunate that we have to just disconnect, but there is no way + * to tell the client that the timeline changed. We also don't know + * exactly where the switch happened, so we cannot safely try to send + * up to the switchover point before disconnecting. + */ + if (currentTargetTLI != ThisTimeLineID) + { + if (!walsender_ready_to_stop) + ereport(LOG, + (errmsg("terminating walsender process to force cascaded standby " + "to update timeline and reconnect"))); + walsender_ready_to_stop = true; + *caughtup = true; + return; + } + } + else + SendRqstPtr = GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index df5f232eeea..4d4558ddb67 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -286,8 +286,8 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); -extern XLogRecPtr GetStandbyFlushRecPtr(void); +extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void);