diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6345d0746c0..29604851218 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -709,6 +709,18 @@ typedef struct XLogCtlData XLogRecPtr lastFpwDisableRecPtr; slock_t info_lck; /* locks shared variables shown above */ + + /* + * Variables used to track segment-boundary-crossing WAL records. See + * RegisterSegmentBoundary. Protected by segtrack_lck. + */ + XLogSegNo lastNotifiedSeg; + XLogSegNo earliestSegBoundary; + XLogRecPtr earliestSegBoundaryEndPtr; + XLogSegNo latestSegBoundary; + XLogRecPtr latestSegBoundaryEndPtr; + + slock_t segtrack_lck; /* locks shared variables shown above */ } XLogCtlData; static XLogCtlData *XLogCtl = NULL; @@ -899,6 +911,7 @@ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecP static void RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr); static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); +static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, @@ -1129,23 +1142,56 @@ XLogInsertRecord(XLogRecData *rdata, END_CRIT_SECTION(); /* - * Update shared LogwrtRqst.Write, if we crossed page boundary. + * If we crossed page boundary, update LogwrtRqst.Write; if we crossed + * segment boundary, register that and wake up walwriter. */ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { + XLogSegNo StartSeg; + XLogSegNo EndSeg; + + XLByteToSeg(StartPos, StartSeg, wal_segment_size); + XLByteToSeg(EndPos, EndSeg, wal_segment_size); + + /* + * Register our crossing the segment boundary if that occurred. + * + * Note that we did not use XLByteToPrevSeg() for determining the + * ending segment. This is so that a record that fits perfectly into + * the end of the segment causes the latter to get marked ready for + * archival immediately. + */ + if (StartSeg != EndSeg && XLogArchivingActive()) + RegisterSegmentBoundary(EndSeg, EndPos); + + /* + * Advance LogwrtRqst.Write so that it includes new block(s). + * + * We do this after registering the segment boundary so that the + * comparison with the flushed pointer below can use the latest value + * known globally. + */ SpinLockAcquire(&XLogCtl->info_lck); - /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; /* update local result copy while I have the chance */ LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + + /* + * There's a chance that the record was already flushed to disk and we + * missed marking segments as ready for archive. If this happens, we + * nudge the WALWriter, which will take care of notifying segments as + * needed. + */ + if (StartSeg != EndSeg && XLogArchivingActive() && + LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch) + SetLatch(ProcGlobal->walwriterLatch); } /* * If this was an XLOG_SWITCH record, flush the record and the empty - * padding space that fills the rest of the segment, and perform - * end-of-segment actions (eg, notifying archiver). + * padding space that fills the rest of the segment. */ if (isLogSwitch) { @@ -2388,6 +2434,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); + Assert(LWLockHeldByMe(WALWriteLock)); /* * Update local LogwrtResult (caller probably did this already, but...) @@ -2524,11 +2571,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage, and to update the - * timer for archive_timeout, and to signal for a checkpoint if - * too many logfile segments have been used since the last - * checkpoint. + * If WAL archiving is active, we attempt to notify the archiver + * of any segments that are now ready for archival. + * + * This is also the right place to update the timer for + * archive_timeout and to signal for a checkpoint if too many + * logfile segments have been used since the last checkpoint. */ if (finishing_seg) { @@ -2540,7 +2588,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); + NotifySegmentsReadyForArchive(LogwrtResult.Flush); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2627,6 +2675,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease(&XLogCtl->info_lck); } + + if (XLogArchivingActive()) + NotifySegmentsReadyForArchive(LogwrtResult.Flush); } /* @@ -4227,6 +4278,129 @@ ValidateXLOGDirectoryStructure(void) } } +/* + * RegisterSegmentBoundary + * + * WAL records that are split across a segment boundary require special + * treatment for archiving: the initial segment must not be archived until + * the end segment has been flushed, in case we crash before we have + * the chance to flush the end segment (because after recovery we would + * overwrite that WAL record with a different one, and so the file we + * archived no longer represents truth.) This also applies to streaming + * physical replication. + * + * To handle this, we keep track of the LSN of WAL records that cross + * segment boundaries. Two such are sufficient: the ones with the + * earliest and the latest end pointers we know about, since the flush + * position advances monotonically. WAL record writers register + * boundary-crossing records here, which is used by .ready file creation + * to delay until the end segment is known flushed. + */ +static void +RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) +{ + XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY; + + /* verify caller computed segment number correctly */ + AssertArg((XLByteToSeg(endpos, segno, wal_segment_size), segno == seg)); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + + /* + * If no segment boundaries are registered, store the new segment boundary + * in earliestSegBoundary. Otherwise, store the greater segment + * boundaries in latestSegBoundary. + */ + if (XLogCtl->earliestSegBoundary == MaxXLogSegNo) + { + XLogCtl->earliestSegBoundary = seg; + XLogCtl->earliestSegBoundaryEndPtr = endpos; + } + else if (seg > XLogCtl->earliestSegBoundary && + (XLogCtl->latestSegBoundary == MaxXLogSegNo || + seg > XLogCtl->latestSegBoundary)) + { + XLogCtl->latestSegBoundary = seg; + XLogCtl->latestSegBoundaryEndPtr = endpos; + } + + SpinLockRelease(&XLogCtl->segtrack_lck); +} + +/* + * NotifySegmentsReadyForArchive + * + * Mark segments as ready for archival, given that it is safe to do so. + * This function is idempotent. + */ +void +NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) +{ + XLogSegNo latest_boundary_seg; + XLogSegNo last_notified; + XLogSegNo flushed_seg; + XLogSegNo seg; + bool keep_latest; + + XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + + if (XLogCtl->latestSegBoundary <= flushed_seg && + XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr) + { + latest_boundary_seg = XLogCtl->latestSegBoundary; + keep_latest = false; + } + else if (XLogCtl->earliestSegBoundary <= flushed_seg && + XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr) + { + latest_boundary_seg = XLogCtl->earliestSegBoundary; + keep_latest = true; + } + else + { + SpinLockRelease(&XLogCtl->segtrack_lck); + return; + } + + last_notified = XLogCtl->lastNotifiedSeg; + + /* + * Update shared memory and discard segment boundaries that are no longer + * needed. + * + * It is safe to update shared memory before we attempt to create the + * .ready files. If our calls to XLogArchiveNotifySeg() fail, + * RemoveOldXlogFiles() will retry it as needed. + */ + if (last_notified < latest_boundary_seg - 1) + XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1; + + if (keep_latest) + { + XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; + XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr; + } + else + { + XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; + } + + XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr; + + SpinLockRelease(&XLogCtl->segtrack_lck); + + /* + * Notify archiver about segments that are ready for archival (by creating + * the corresponding .ready files). + */ + for (seg = last_notified + 1; seg < latest_boundary_seg; seg++) + XLogArchiveNotifySeg(seg); +} + /* * Remove previous backup history files. This also retries creation of * .ready files for any backup history files for which XLogArchiveNotify @@ -5112,8 +5286,16 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + SpinLockInit(&XLogCtl->segtrack_lck); SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); + + /* Initialize stuff for marking segments as ready for archival. */ + XLogCtl->lastNotifiedSeg = MaxXLogSegNo; + XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; + XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr; } /* @@ -7605,6 +7787,20 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; + /* + * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file. + */ + if (XLogArchivingActive()) + { + XLogSegNo EndOfLogSeg; + + XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1; + SpinLockRelease(&XLogCtl->segtrack_lck); + } + /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE * record before resource manager writes cleanup WAL records or checkpoint diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index a6fdba3f413..6cf6434fee4 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -256,6 +256,13 @@ WalWriterMain(void) proc_exit(0); /* done */ } + /* + * Notify the archiver of any WAL segments that are ready. We do this + * here to handle a race condition where WAL is flushed to disk prior + * to registering the segment boundary. + */ + NotifySegmentsReadyForArchive(GetFlushRecPtr()); + /* * Do what we're here for; then, if XLogBackgroundFlush() found useful * work to do, reset hibernation counter. diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0bed7e337ce..910200337b1 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -322,6 +322,7 @@ extern XLogRecPtr GetInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void RemovePromoteSignalFiles(void); +extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr); extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index daded3dca05..6088ac87c74 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -39,6 +39,7 @@ typedef uint64 XLogRecPtr; * XLogSegNo - physical log file sequence number. */ typedef uint64 XLogSegNo; +#define MaxXLogSegNo ((XLogSegNo) 0xFFFFFFFFFFFFFFFF) /* * TimeLineID (TLI) - identifies different database histories to prevent