mirror of
https://github.com/postgres/postgres.git
synced 2025-08-30 06:01:21 +03:00
Rework WAL-reading supporting structs
The state-tracking of WAL reading in various places was pretty messy, mostly because the ancient physical-replication WAL reading code wasn't using the XLogReader abstraction. This led to some untidy code. Make it prettier by creating two additional supporting structs, WALSegmentContext and WALOpenSegment which keep track of WAL-reading state. This makes code cleaner, as well as supports more future cleanup. Author: Antonin Houska Reviewed-by: Álvaro Herrera and (older versions) Robert Haas Discussion: https://postgr.es/m/14984.1554998742@spoje.net
This commit is contained in:
@@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
|
||||
*
|
||||
* Note clearly that this function can access WAL during normal operation,
|
||||
* similarly to the way WALSender or Logical Decoding would do.
|
||||
*
|
||||
*/
|
||||
static void
|
||||
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
|
||||
@@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
|
||||
XLogReaderState *xlogreader;
|
||||
char *errormsg;
|
||||
|
||||
xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
|
||||
NULL);
|
||||
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
|
||||
&read_local_xlog_page, NULL);
|
||||
if (!xlogreader)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||
|
@@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
|
||||
int source, bool notfoundOk);
|
||||
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
|
||||
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
|
||||
TimeLineID *readTLI);
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
|
||||
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
|
||||
bool fetching_ckpt, XLogRecPtr tliRecPtr);
|
||||
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
|
||||
@@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
|
||||
|
||||
if (!debug_reader)
|
||||
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
|
||||
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
|
||||
NULL, NULL);
|
||||
|
||||
if (!debug_reader)
|
||||
{
|
||||
@@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
|
||||
XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
|
||||
offset = XLogSegmentOffset(xlogreader->latestPagePtr,
|
||||
wal_segment_size);
|
||||
XLogFileName(fname, xlogreader->readPageTLI, segno,
|
||||
XLogFileName(fname, xlogreader->seg.ws_tli, segno,
|
||||
wal_segment_size);
|
||||
ereport(emode_for_corrupt_record(emode,
|
||||
RecPtr ? RecPtr : EndRecPtr),
|
||||
@@ -6353,7 +6353,8 @@ StartupXLOG(void)
|
||||
|
||||
/* Set up XLOG reader facility */
|
||||
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
|
||||
xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
|
||||
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
|
||||
&XLogPageRead, &private);
|
||||
if (!xlogreader)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||
@@ -7355,7 +7356,7 @@ StartupXLOG(void)
|
||||
* and we were reading the old WAL from a segment belonging to a higher
|
||||
* timeline.
|
||||
*/
|
||||
EndOfLogTLI = xlogreader->readPageTLI;
|
||||
EndOfLogTLI = xlogreader->seg.ws_tli;
|
||||
|
||||
/*
|
||||
* Complain if we did not roll forward far enough to render the backup
|
||||
@@ -11523,7 +11524,7 @@ CancelBackup(void)
|
||||
*/
|
||||
static int
|
||||
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
|
||||
XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
|
||||
XLogRecPtr targetRecPtr, char *readBuf)
|
||||
{
|
||||
XLogPageReadPrivate *private =
|
||||
(XLogPageReadPrivate *) xlogreader->private_data;
|
||||
@@ -11640,7 +11641,7 @@ retry:
|
||||
Assert(targetPageOff == readOff);
|
||||
Assert(reqLen <= readLen);
|
||||
|
||||
*readTLI = curFileTLI;
|
||||
xlogreader->seg.ws_tli = curFileTLI;
|
||||
|
||||
/*
|
||||
* Check the page header immediately, so that we can retry immediately if
|
||||
|
@@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
|
||||
* Returns NULL if the xlogreader couldn't be allocated.
|
||||
*/
|
||||
XLogReaderState *
|
||||
XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
|
||||
void *private_data)
|
||||
XLogReaderAllocate(int wal_segment_size, const char *waldir,
|
||||
XLogPageReadCB pagereadfunc, void *private_data)
|
||||
{
|
||||
XLogReaderState *state;
|
||||
|
||||
@@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
state->wal_segment_size = wal_segment_size;
|
||||
/* Initialize segment info. */
|
||||
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
|
||||
waldir);
|
||||
|
||||
state->read_page = pagereadfunc;
|
||||
/* system_identifier initialized to zeroes above */
|
||||
state->private_data = private_data;
|
||||
@@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize the passed segment structs.
|
||||
*/
|
||||
void
|
||||
WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
|
||||
int segsize, const char *waldir)
|
||||
{
|
||||
seg->ws_file = -1;
|
||||
seg->ws_segno = 0;
|
||||
seg->ws_off = 0;
|
||||
seg->ws_tli = 0;
|
||||
|
||||
segcxt->ws_segsize = segsize;
|
||||
if (waldir)
|
||||
snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to read an XLOG record.
|
||||
*
|
||||
@@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
|
||||
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
|
||||
{
|
||||
/* Pretend it extends to end of segment */
|
||||
state->EndRecPtr += state->wal_segment_size - 1;
|
||||
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
|
||||
state->EndRecPtr += state->segcxt.ws_segsize - 1;
|
||||
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
|
||||
}
|
||||
|
||||
if (DecodeXLogRecord(state, record, errormsg))
|
||||
@@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
|
||||
Assert((pageptr % XLOG_BLCKSZ) == 0);
|
||||
|
||||
XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
|
||||
targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
|
||||
XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
|
||||
targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
|
||||
|
||||
/* check whether we have all the requested data already */
|
||||
if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
|
||||
reqLen <= state->readLen)
|
||||
if (targetSegNo == state->seg.ws_segno &&
|
||||
targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
|
||||
return state->readLen;
|
||||
|
||||
/*
|
||||
@@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
* record is. This is so that we can check the additional identification
|
||||
* info that is present in the first page's "long" header.
|
||||
*/
|
||||
if (targetSegNo != state->readSegNo && targetPageOff != 0)
|
||||
if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
|
||||
{
|
||||
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
|
||||
|
||||
readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
|
||||
state->currRecPtr,
|
||||
state->readBuf, &state->readPageTLI);
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
goto err;
|
||||
|
||||
@@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
*/
|
||||
readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
|
||||
state->currRecPtr,
|
||||
state->readBuf, &state->readPageTLI);
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
goto err;
|
||||
|
||||
@@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
{
|
||||
readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
|
||||
state->currRecPtr,
|
||||
state->readBuf, &state->readPageTLI);
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
goto err;
|
||||
}
|
||||
@@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
goto err;
|
||||
|
||||
/* update read state information */
|
||||
state->readSegNo = targetSegNo;
|
||||
state->readOff = targetPageOff;
|
||||
state->seg.ws_segno = targetSegNo;
|
||||
state->seg.ws_off = targetPageOff;
|
||||
state->readLen = readLen;
|
||||
|
||||
return readLen;
|
||||
@@ -625,8 +645,8 @@ err:
|
||||
static void
|
||||
XLogReaderInvalReadState(XLogReaderState *state)
|
||||
{
|
||||
state->readSegNo = 0;
|
||||
state->readOff = 0;
|
||||
state->seg.ws_segno = 0;
|
||||
state->seg.ws_off = 0;
|
||||
state->readLen = 0;
|
||||
}
|
||||
|
||||
@@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
|
||||
Assert((recptr % XLOG_BLCKSZ) == 0);
|
||||
|
||||
XLByteToSeg(recptr, segno, state->wal_segment_size);
|
||||
offset = XLogSegmentOffset(recptr, state->wal_segment_size);
|
||||
XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
|
||||
offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
|
||||
|
||||
XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
|
||||
XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
|
||||
|
||||
if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
|
||||
XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
|
||||
|
||||
report_invalid_record(state,
|
||||
"invalid magic number %04X in log segment %s, offset %u",
|
||||
@@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
|
||||
XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
|
||||
|
||||
report_invalid_record(state,
|
||||
"invalid info bits %04X in log segment %s, offset %u",
|
||||
@@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
(unsigned long long) state->system_identifier);
|
||||
return false;
|
||||
}
|
||||
else if (longhdr->xlp_seg_size != state->wal_segment_size)
|
||||
else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
|
||||
{
|
||||
report_invalid_record(state,
|
||||
"WAL file is from different database system: incorrect segment size in page header");
|
||||
@@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
|
||||
XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
|
||||
|
||||
/* hmm, first page of file doesn't have a long header? */
|
||||
report_invalid_record(state,
|
||||
@@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
|
||||
XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
|
||||
|
||||
report_invalid_record(state,
|
||||
"unexpected pageaddr %X/%X in log segment %s, offset %u",
|
||||
@@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
|
||||
XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
|
||||
|
||||
report_invalid_record(state,
|
||||
"out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
|
||||
@@ -997,7 +1017,6 @@ out:
|
||||
|
||||
#endif /* FRONTEND */
|
||||
|
||||
|
||||
/* ----------------------------------------
|
||||
* Functions for decoding the data and block references in a record.
|
||||
* ----------------------------------------
|
||||
|
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
|
||||
void
|
||||
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
|
||||
{
|
||||
const XLogRecPtr lastReadPage = state->readSegNo *
|
||||
state->wal_segment_size + state->readOff;
|
||||
const XLogRecPtr lastReadPage = state->seg.ws_segno *
|
||||
state->segcxt.ws_segsize + state->seg.ws_off;
|
||||
|
||||
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
|
||||
Assert(wantLength <= XLOG_BLCKSZ);
|
||||
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
|
||||
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
|
||||
state->currTLI != ThisTimeLineID &&
|
||||
state->currTLI != 0 &&
|
||||
((wantPage + wantLength) / state->wal_segment_size) <
|
||||
(state->currTLIValidUntil / state->wal_segment_size))
|
||||
((wantPage + wantLength) / state->segcxt.ws_segsize) <
|
||||
(state->currTLIValidUntil / state->segcxt.ws_segsize))
|
||||
return;
|
||||
|
||||
/*
|
||||
@@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
|
||||
* by a promotion or replay from a cascaded replica.
|
||||
*/
|
||||
List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
|
||||
XLogRecPtr endOfSegment;
|
||||
|
||||
XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1)
|
||||
* state->wal_segment_size) - 1;
|
||||
|
||||
Assert(wantPage / state->wal_segment_size ==
|
||||
endOfSegment / state->wal_segment_size);
|
||||
endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
|
||||
state->segcxt.ws_segsize - 1;
|
||||
Assert(wantPage / state->segcxt.ws_segsize ==
|
||||
endOfSegment / state->segcxt.ws_segsize);
|
||||
|
||||
/*
|
||||
* Find the timeline of the last LSN on the segment containing
|
||||
@@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
|
||||
*/
|
||||
int
|
||||
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
|
||||
TimeLineID *pageTLI)
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
|
||||
{
|
||||
XLogRecPtr read_upto,
|
||||
loc;
|
||||
@@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
read_upto = GetFlushRecPtr();
|
||||
else
|
||||
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
|
||||
|
||||
*pageTLI = ThisTimeLineID;
|
||||
state->seg.ws_tli = ThisTimeLineID;
|
||||
|
||||
/*
|
||||
* Check which timeline to get the record from.
|
||||
@@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
read_upto = state->currTLIValidUntil;
|
||||
|
||||
/*
|
||||
* Setting pageTLI to our wanted record's TLI is slightly wrong;
|
||||
* Setting ws_tli to our wanted record's TLI is slightly wrong;
|
||||
* the page might begin on an older timeline if it contains a
|
||||
* timeline switch, since its xlog segment will have been copied
|
||||
* from the prior timeline. This is pretty harmless though, as
|
||||
* nothing cares so long as the timeline doesn't go backwards. We
|
||||
* should read the page header instead; FIXME someday.
|
||||
*/
|
||||
*pageTLI = state->currTLI;
|
||||
state->seg.ws_tli = state->currTLI;
|
||||
|
||||
/* No need to wait on a historical timeline */
|
||||
break;
|
||||
@@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
* as 'count', read the whole page anyway. It's guaranteed to be
|
||||
* zero-padded up to the page boundary if it's incomplete.
|
||||
*/
|
||||
XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
|
||||
XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
|
||||
XLOG_BLCKSZ);
|
||||
|
||||
/* number of valid bytes in the buffer */
|
||||
|
@@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
|
||||
ctx->slot = slot;
|
||||
|
||||
ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
|
||||
ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
|
||||
if (!ctx->reader)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||
|
@@ -116,10 +116,10 @@ check_permissions(void)
|
||||
|
||||
int
|
||||
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
|
||||
{
|
||||
return read_local_xlog_page(state, targetPagePtr, reqLen,
|
||||
targetRecPtr, cur_page, pageTLI);
|
||||
targetRecPtr, cur_page);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@@ -128,16 +128,8 @@ bool log_replication_commands = false;
|
||||
*/
|
||||
bool wake_wal_senders = false;
|
||||
|
||||
/*
|
||||
* These variables are used similarly to openLogFile/SegNo/Off,
|
||||
* but for walsender to read the XLOG.
|
||||
*/
|
||||
static int sendFile = -1;
|
||||
static XLogSegNo sendSegNo = 0;
|
||||
static uint32 sendOff = 0;
|
||||
|
||||
/* Timeline ID of the currently open file */
|
||||
static TimeLineID curFileTimeLine = 0;
|
||||
static WALOpenSegment *sendSeg = NULL;
|
||||
static WALSegmentContext *sendCxt = NULL;
|
||||
|
||||
/*
|
||||
* These variables keep track of the state of the timeline we're currently
|
||||
@@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
|
||||
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
|
||||
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
|
||||
|
||||
static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
|
||||
static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
|
||||
|
||||
|
||||
/* Initialize walsender process before entering the main command loop */
|
||||
@@ -285,6 +277,13 @@ InitWalSender(void)
|
||||
|
||||
/* Initialize empty timestamp buffer for lag tracking. */
|
||||
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
|
||||
|
||||
/* Make sure we can remember the current read position in XLOG. */
|
||||
sendSeg = (WALOpenSegment *)
|
||||
MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
|
||||
sendCxt = (WALSegmentContext *)
|
||||
MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
|
||||
WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -301,10 +300,10 @@ WalSndErrorCleanup(void)
|
||||
ConditionVariableCancelSleep();
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (sendFile >= 0)
|
||||
if (sendSeg->ws_file >= 0)
|
||||
{
|
||||
close(sendFile);
|
||||
sendFile = -1;
|
||||
close(sendSeg->ws_file);
|
||||
sendSeg->ws_file = -1;
|
||||
}
|
||||
|
||||
if (MyReplicationSlot != NULL)
|
||||
@@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd)
|
||||
*/
|
||||
static int
|
||||
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
|
||||
XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
|
||||
XLogRecPtr targetRecPtr, char *cur_page)
|
||||
{
|
||||
XLogRecPtr flushptr;
|
||||
int count;
|
||||
@@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
|
||||
count = flushptr - targetPagePtr; /* part of the page available */
|
||||
|
||||
/* now actually read the data, we know it's there */
|
||||
XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
|
||||
XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
|
||||
|
||||
return count;
|
||||
}
|
||||
@@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg)
|
||||
* more than one.
|
||||
*/
|
||||
static void
|
||||
XLogRead(char *buf, XLogRecPtr startptr, Size count)
|
||||
XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
|
||||
{
|
||||
char *p;
|
||||
XLogRecPtr recptr;
|
||||
@@ -2382,17 +2381,18 @@ retry:
|
||||
int segbytes;
|
||||
int readbytes;
|
||||
|
||||
startoff = XLogSegmentOffset(recptr, wal_segment_size);
|
||||
startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
|
||||
|
||||
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
|
||||
if (sendSeg->ws_file < 0 ||
|
||||
!XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
/* Switch to another logfile segment */
|
||||
if (sendFile >= 0)
|
||||
close(sendFile);
|
||||
if (sendSeg->ws_file >= 0)
|
||||
close(sendSeg->ws_file);
|
||||
|
||||
XLByteToSeg(recptr, sendSegNo, wal_segment_size);
|
||||
XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
|
||||
|
||||
/*-------
|
||||
* When reading from a historic timeline, and there is a timeline
|
||||
@@ -2420,20 +2420,20 @@ retry:
|
||||
* used portion of the old segment is copied to the new file.
|
||||
*-------
|
||||
*/
|
||||
curFileTimeLine = sendTimeLine;
|
||||
sendSeg->ws_tli = sendTimeLine;
|
||||
if (sendTimeLineIsHistoric)
|
||||
{
|
||||
XLogSegNo endSegNo;
|
||||
|
||||
XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
|
||||
if (sendSegNo == endSegNo)
|
||||
curFileTimeLine = sendTimeLineNextTLI;
|
||||
XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
|
||||
if (sendSeg->ws_segno == endSegNo)
|
||||
sendSeg->ws_tli = sendTimeLineNextTLI;
|
||||
}
|
||||
|
||||
XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
|
||||
XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
|
||||
|
||||
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
if (sendFile < 0)
|
||||
sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
if (sendSeg->ws_file < 0)
|
||||
{
|
||||
/*
|
||||
* If the file is not found, assume it's because the standby
|
||||
@@ -2444,58 +2444,58 @@ retry:
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("requested WAL segment %s has already been removed",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo))));
|
||||
XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\": %m",
|
||||
path)));
|
||||
}
|
||||
sendOff = 0;
|
||||
sendSeg->ws_off = 0;
|
||||
}
|
||||
|
||||
/* Need to seek in the file? */
|
||||
if (sendOff != startoff)
|
||||
if (sendSeg->ws_off != startoff)
|
||||
{
|
||||
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
|
||||
if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log segment %s to offset %u: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
|
||||
startoff)));
|
||||
sendOff = startoff;
|
||||
sendSeg->ws_off = startoff;
|
||||
}
|
||||
|
||||
/* How many bytes are within this segment? */
|
||||
if (nbytes > (wal_segment_size - startoff))
|
||||
segbytes = wal_segment_size - startoff;
|
||||
if (nbytes > (segcxt->ws_segsize - startoff))
|
||||
segbytes = segcxt->ws_segsize - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
|
||||
readbytes = read(sendFile, p, segbytes);
|
||||
readbytes = read(sendSeg->ws_file, p, segbytes);
|
||||
pgstat_report_wait_end();
|
||||
if (readbytes < 0)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log segment %s, offset %u, length %zu: %m",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
sendOff, (Size) segbytes)));
|
||||
XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
|
||||
sendSeg->ws_off, (Size) segbytes)));
|
||||
}
|
||||
else if (readbytes == 0)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
|
||||
XLogFileNameP(curFileTimeLine, sendSegNo),
|
||||
sendOff, readbytes, (Size) segbytes)));
|
||||
XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
|
||||
sendSeg->ws_off, readbytes, (Size) segbytes)));
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
recptr += readbytes;
|
||||
|
||||
sendOff += readbytes;
|
||||
sendSeg->ws_off += readbytes;
|
||||
nbytes -= readbytes;
|
||||
p += readbytes;
|
||||
}
|
||||
@@ -2507,7 +2507,7 @@ retry:
|
||||
* read() succeeds in that case, but the data we tried to read might
|
||||
* already have been overwritten with new WAL records.
|
||||
*/
|
||||
XLByteToSeg(startptr, segno, wal_segment_size);
|
||||
XLByteToSeg(startptr, segno, segcxt->ws_segsize);
|
||||
CheckXLogRemoved(segno, ThisTimeLineID);
|
||||
|
||||
/*
|
||||
@@ -2526,10 +2526,10 @@ retry:
|
||||
walsnd->needreload = false;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
|
||||
if (reload && sendFile >= 0)
|
||||
if (reload && sendSeg->ws_file >= 0)
|
||||
{
|
||||
close(sendFile);
|
||||
sendFile = -1;
|
||||
close(sendSeg->ws_file);
|
||||
sendSeg->ws_file = -1;
|
||||
|
||||
goto retry;
|
||||
}
|
||||
@@ -2695,9 +2695,9 @@ XLogSendPhysical(void)
|
||||
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
|
||||
{
|
||||
/* close the current file. */
|
||||
if (sendFile >= 0)
|
||||
close(sendFile);
|
||||
sendFile = -1;
|
||||
if (sendSeg->ws_file >= 0)
|
||||
close(sendSeg->ws_file);
|
||||
sendSeg->ws_file = -1;
|
||||
|
||||
/* Send CopyDone */
|
||||
pq_putmessage_noblock('c', NULL, 0);
|
||||
@@ -2768,7 +2768,7 @@ XLogSendPhysical(void)
|
||||
* calls.
|
||||
*/
|
||||
enlargeStringInfo(&output_message, nbytes);
|
||||
XLogRead(&output_message.data[output_message.len], startptr, nbytes);
|
||||
XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
|
||||
output_message.len += nbytes;
|
||||
output_message.data[output_message.len] = '\0';
|
||||
|
||||
|
Reference in New Issue
Block a user