1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

New WAL version - CRC and data blocks backup.

This commit is contained in:
Vadim B. Mikheev
2000-12-28 13:00:29 +00:00
parent c996c7f573
commit 7ceeeb662f
18 changed files with 1203 additions and 1202 deletions

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.44 2000/12/18 18:45:03 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.45 2000/12/28 13:00:08 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -31,6 +31,7 @@
#include "storage/proc.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "storage/bufpage.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "utils/builtins.h"
@ -43,6 +44,7 @@ XLogRecPtr MyLastRecPtr = {0, 0};
uint32 CritSectionCount = 0;
bool InRecovery = false;
StartUpID ThisStartUpID = 0;
XLogRecPtr RedoRecPtr;
int XLOG_DEBUG = 0;
@ -71,11 +73,12 @@ typedef struct XLgwrResult
typedef struct XLogCtlInsert
{
XLgwrResult LgwrResult;
XLogRecPtr PrevRecord;
uint16 curridx; /* current block index in cache */
XLogPageHeader currpage;
char *currpos;
XLgwrResult LgwrResult;
XLogRecPtr PrevRecord;
uint16 curridx; /* current block index in cache */
XLogPageHeader currpage;
char *currpos;
XLogRecPtr RedoRecPtr;
} XLogCtlInsert;
typedef struct XLogCtlWrite
@ -96,6 +99,7 @@ typedef struct XLogCtlData
uint32 XLogCacheByte;
uint32 XLogCacheBlck;
StartUpID ThisStartUpID;
XLogRecPtr RedoRecPtr; /* for postmaster */
slock_t insert_lck;
slock_t info_lck;
slock_t lgwr_lck;
@ -121,9 +125,7 @@ typedef enum DBState
typedef struct ControlFileData
{
/*
* XLOG state
*/
crc64 crc;
uint32 logId; /* current log file id */
uint32 logSeg; /* current log file segment (1-based) */
XLogRecPtr checkPoint; /* last check point record ptr */
@ -149,7 +151,6 @@ typedef struct ControlFileData
static ControlFileData *ControlFile = NULL;
typedef struct CheckPoint
{
XLogRecPtr redo; /* next RecPtr available when we */
@ -167,6 +168,13 @@ typedef struct CheckPoint
#define XLOG_CHECKPOINT 0x00
#define XLOG_NEXTOID 0x10
typedef struct BkpBlock
{
crc64 crc;
RelFileNode node;
BlockNumber block;
} BkpBlock;
/*
* We break each log file in 16Mb segments
*/
@ -208,6 +216,33 @@ typedef struct CheckPoint
(xrecoff % BLCKSZ >= SizeOfXLogPHD && \
(BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
#define _INTL_MAXLOGRECSZ (3 * MAXLOGRECSZ)
extern uint32 crc_table[];
#define INIT_CRC64(crc) (crc.crc1 = 0xffffffff, crc.crc2 = 0xffffffff)
#define FIN_CRC64(crc) (crc.crc1 ^= 0xffffffff, crc.crc2 ^= 0xffffffff)
#define COMP_CRC64(crc, data, len) \
{\
uint32 __c1 = crc.crc1;\
uint32 __c2 = crc.crc2;\
char *__data = data;\
uint32 __len = len;\
\
while (__len >= 2)\
{\
__c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\
__c2 = crc_table[(__c2 ^ *__data++) & 0xff] ^ (__c2 >> 8);\
__len -= 2;\
}\
if (__len > 0)\
__c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\
crc.crc1 = __c1;\
crc.crc2 = __c2;\
}
void SetRedoRecPtr(void);
void GetRedoRecPtr(void);
static void GetFreeXLBuffer(void);
static void XLogWrite(char *buffer);
static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent);
@ -238,17 +273,26 @@ static XLogRecord *nextRecord = NULL;
static bool InRedo = false;
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecord *record;
XLogSubRecord *subrecord;
XLogRecPtr RecPtr;
uint32 len = hdrlen + buflen,
freespace,
wlen;
uint32 freespace;
uint16 curridx;
XLogRecData *rdt;
Buffer dtbuf[2] = {InvalidBuffer, InvalidBuffer};
bool dtbuf_bkp[2] = {false, false};
XLogRecData dtbuf_rdt[4];
BkpBlock dtbuf_xlg[2];
XLogRecPtr dtbuf_lsn[2];
crc64 dtbuf_crc[2],
rdata_crc;
uint32 len;
unsigned i;
bool updrqst = false;
bool repeat = false;
bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
if (info & XLR_INFO_MASK)
@ -260,9 +304,6 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
info &= ~XLR_INFO_MASK;
}
if (len == 0 || len > MAXLOGRECSZ)
elog(STOP, "XLogInsert: invalid record len %u", len);
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
RecPtr.xlogid = 0;
@ -270,15 +311,72 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
return (RecPtr);
}
begin:;
INIT_CRC64(rdata_crc);
for (len = 0, rdt = rdata; ; )
{
if (rdt->buffer == InvalidBuffer)
{
len += rdt->len;
COMP_CRC64(rdata_crc, rdt->data, rdt->len);
if (rdt->next == NULL)
break;
rdt = rdt->next;
continue;
}
for (i = 0; i < 2; i++)
{
if (rdt->buffer == dtbuf[i])
{
if (dtbuf_bkp[i])
rdt->data = NULL;
else if (rdt->data)
{
len += rdt->len;
COMP_CRC64(rdata_crc, rdt->data, rdt->len);
}
break;
}
if (dtbuf[i] == InvalidBuffer)
{
dtbuf[i] = rdt->buffer;
dtbuf_lsn[i] = *((XLogRecPtr*)(BufferGetBlock(rdt->buffer)));
if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{
crc64 crc;
dtbuf_bkp[i] = true;
rdt->data = NULL;
INIT_CRC64(crc);
COMP_CRC64(crc, ((char*)BufferGetBlock(dtbuf[i])), BLCKSZ);
dtbuf_crc[i] = crc;
}
else if (rdt->data)
{
len += rdt->len;
COMP_CRC64(rdata_crc, rdt->data, rdt->len);
}
break;
}
}
if (i >= 2)
elog(STOP, "XLogInsert: can backup 2 blocks at most");
if (rdt->next == NULL)
break;
rdt = rdt->next;
}
if (len == 0 || len > MAXLOGRECSZ)
elog(STOP, "XLogInsert: invalid record len %u", len);
START_CRIT_CODE;
/* obtain xlog insert lock */
if (TAS(&(XLogCtl->insert_lck))) /* busy */
{
bool do_lgwr = true;
unsigned i = 0;
for (;;)
for (i = 0;;)
{
/* try to read LgwrResult while waiting for insert lock */
if (!TAS(&(XLogCtl->info_lck)))
@ -319,6 +417,59 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
}
}
/* Race condition: RedoRecPtr was changed */
RedoRecPtr = Insert->RedoRecPtr;
repeat = false;
for (i = 0; i < 2; i++)
{
if (dtbuf[i] == InvalidBuffer)
continue;
if (dtbuf_bkp[i] == false &&
XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{
dtbuf[i] = InvalidBuffer;
repeat = true;
}
}
if (repeat)
{
S_UNLOCK(&(XLogCtl->insert_lck));
END_CRIT_CODE;
goto begin;
}
/* Attach backup blocks to record data */
for (i = 0; i < 2; i++)
{
if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
continue;
info |= (XLR_SET_BKP_BLOCK(i));
dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
COMP_CRC64(dtbuf_crc[i],
((char*)&(dtbuf_xlg[i]) + offsetof(BkpBlock, node)),
(sizeof(BkpBlock) - offsetof(BkpBlock, node)));
FIN_CRC64(dtbuf_crc[i]);
dtbuf_xlg[i].crc = dtbuf_crc[i];
rdt->next = &(dtbuf_rdt[2 * i]);
dtbuf_rdt[2 * i].data = (char*)&(dtbuf_xlg[i]);
dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
len += sizeof(BkpBlock);
rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
dtbuf_rdt[2 * i + 1].data = (char*)(BufferGetBlock(dtbuf[i]));
dtbuf_rdt[2 * i + 1].len = BLCKSZ;
len += BLCKSZ;
dtbuf_rdt[2 * i + 1].next = NULL;
}
/* Insert record */
freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
if (freespace < SizeOfXLogRecord)
{
@ -344,10 +495,15 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
record->xl_xact_prev = MyLastRecPtr;
record->xl_xid = GetCurrentTransactionId();
record->xl_len = (len > freespace) ? freespace : len;
record->xl_info = (len > freespace) ?
(info | XLR_TO_BE_CONTINUED) : info;
record->xl_len = len;
record->xl_info = info;
record->xl_rmid = rmid;
COMP_CRC64(rdata_crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
(SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
FIN_CRC64(rdata_crc);
record->xl_crc = rdata_crc;
RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
RecPtr.xrecoff =
XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
@ -366,10 +522,10 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff);
xlog_outrec(buf, record);
if (hdr != NULL)
if (rdata->data != NULL)
{
strcat(buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, hdr);
RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
}
strcat(buf, "\n");
write(2, buf, strlen(buf));
@ -377,31 +533,33 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32
MyLastRecPtr = RecPtr; /* begin of record */
Insert->currpos += SizeOfXLogRecord;
if (freespace > 0)
{
wlen = (hdrlen > freespace) ? freespace : hdrlen;
memcpy(Insert->currpos, hdr, wlen);
freespace -= wlen;
hdrlen -= wlen;
hdr += wlen;
Insert->currpos += wlen;
if (buflen > 0 && freespace > 0)
{
wlen = (buflen > freespace) ? freespace : buflen;
memcpy(Insert->currpos, buf, wlen);
freespace -= wlen;
buflen -= wlen;
buf += wlen;
Insert->currpos += wlen;
}
Insert->currpos = ((char *) Insert->currpage) +
MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
len = hdrlen + buflen;
}
if (len != 0)
while (len)
{
nbuf:
while (rdata->data == NULL)
rdata = rdata->next;
if (freespace > 0)
{
if (rdata->len > freespace)
{
memcpy(Insert->currpos, rdata->data, freespace);
rdata->data += freespace;
rdata->len -= freespace;
len -= freespace;
}
else
{
memcpy(Insert->currpos, rdata->data, rdata->len);
freespace -= rdata->len;
len -= rdata->len;
Insert->currpos += rdata->len;
rdata = rdata->next;
continue;
}
}
/* Use next buffer */
curridx = NextBufIdx(curridx);
if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
{
@ -409,55 +567,16 @@ nbuf:
updrqst = true;
}
else
{
GetFreeXLBuffer();
updrqst = false;
}
freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
subrecord = (XLogSubRecord *) Insert->currpos;
subrecord->xl_len = len;
Insert->currpos += SizeOfXLogSubRecord;
if (hdrlen > freespace)
{
subrecord->xl_len = freespace;
/* we don't store info in subrecord' xl_info */
subrecord->xl_info = XLR_TO_BE_CONTINUED;
memcpy(Insert->currpos, hdr, freespace);
hdrlen -= freespace;
hdr += freespace;
goto nbuf;
}
else if (hdrlen > 0)
{
subrecord->xl_len = hdrlen;
memcpy(Insert->currpos, hdr, hdrlen);
Insert->currpos += hdrlen;
freespace -= hdrlen;
hdrlen = 0;
}
else
subrecord->xl_len = 0;
if (buflen > freespace)
{
subrecord->xl_len += freespace;
/* we don't store info in subrecord' xl_info */
subrecord->xl_info = XLR_TO_BE_CONTINUED;
memcpy(Insert->currpos, buf, freespace);
buflen -= freespace;
buf += freespace;
goto nbuf;
}
else if (buflen > 0)
{
subrecord->xl_len += buflen;
memcpy(Insert->currpos, buf, buflen);
Insert->currpos += buflen;
}
/* we don't store info in subrecord' xl_info */
subrecord->xl_info = 0;
Insert->currpos = ((char *) Insert->currpage) +
MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
}
Insert->currpos = ((char *) Insert->currpage) +
MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
/*
@ -469,12 +588,9 @@ nbuf:
XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
Insert->currpos - ((char *) Insert->currpage);
/*
* All done! Update global LgwrRqst if some block was filled up.
*/
/* Need to update global LgwrRqst if some block was filled up */
if (freespace < SizeOfXLogRecord)
updrqst = true; /* curridx is filled and available for
* writing out */
updrqst = true; /* curridx is filled and available for writing out */
else
curridx = PrevBufIdx(curridx);
LgwrRqst.Write = XLogCtl->xlblocks[curridx];
@ -483,8 +599,6 @@ nbuf:
if (updrqst)
{
unsigned i = 0;
for (;;)
{
if (!TAS(&(XLogCtl->info_lck)))
@ -959,11 +1073,117 @@ MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg)
closedir(xldir);
}
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
Relation reln;
Buffer buffer;
Page page;
BkpBlock bkpb;
char *blk;
int i;
for (i = 0, blk = (char*)XLogRecGetData(record) + record->xl_len; i < 2; i++)
{
if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
continue;
memcpy((char*)&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);
if (reln)
{
buffer = XLogReadBuffer(true, reln, bkpb.block);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
memcpy((char*)page, blk, BLCKSZ);
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
blk += BLCKSZ;
}
}
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
crc64 crc;
crc64 cbuf;
int i;
uint32 len = record->xl_len;
char *blk;
for (i = 0; i < 2; i++)
{
if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
continue;
if (len <= (sizeof(BkpBlock) + BLCKSZ))
{
elog(emode, "ReadRecord: record at %u/%u is too short to keep bkp block",
recptr.xlogid, recptr.xrecoff);
return(false);
}
len -= sizeof(BkpBlock);
len -= BLCKSZ;
}
/* CRC of rmgr data */
INIT_CRC64(crc);
COMP_CRC64(crc, ((char*)XLogRecGetData(record)), len);
COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
(SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
FIN_CRC64(crc);
if (record->xl_crc.crc1 != crc.crc1 || record->xl_crc.crc2 != crc.crc2)
{
elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u",
recptr.xlogid, recptr.xrecoff);
return(false);
}
if (record->xl_len == len)
return(true);
for (i = 0, blk = (char*)XLogRecGetData(record) + len; i < 2; i++)
{
if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
continue;
INIT_CRC64(crc);
COMP_CRC64(crc, (blk + sizeof(BkpBlock)), BLCKSZ);
COMP_CRC64(crc, (blk + offsetof(BkpBlock, node)),
(sizeof(BkpBlock) - offsetof(BkpBlock, node)));
FIN_CRC64(crc);
memcpy((char*)&cbuf, blk, sizeof(crc64));
if (cbuf.crc1 != crc.crc1 || cbuf.crc2 != crc.crc2)
{
elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u",
i + 1, recptr.xlogid, recptr.xrecoff);
return(false);
}
blk += sizeof(BkpBlock);
blk += BLCKSZ;
}
record->xl_len = len; /* !!! */
return(true);
}
static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, char *buffer)
{
XLogRecord *record;
XLogRecPtr tmpRecPtr = EndRecPtr;
uint32 len;
bool nextmode = (RecPtr == NULL);
int emode = (nextmode) ? LOG : STOP;
bool noBlck = false;
@ -1032,11 +1252,10 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
got_record:;
if (record->xl_len >
(BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
if (record->xl_len > _INTL_MAXLOGRECSZ)
{
elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
if (record->xl_rmid > RM_MAX_ID)
@ -1046,21 +1265,15 @@ got_record:;
goto next_record_is_invalid;
}
nextRecord = NULL;
if (record->xl_info & XLR_TO_BE_CONTINUED)
len = BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord;
if (record->xl_len > len)
{
XLogSubRecord *subrecord;
uint32 len = record->xl_len;
XLogSubRecord *subrecord;
uint32 gotlen = len;
if (MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ +
SizeOfXLogRecord != BLCKSZ)
{
elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
memcpy(buffer, record, record->xl_len + SizeOfXLogRecord);
memcpy(buffer, record, len + SizeOfXLogRecord);
record = (XLogRecord *) buffer;
buffer += record->xl_len + SizeOfXLogRecord;
buffer += len + SizeOfXLogRecord;
for (;;)
{
readOff++;
@ -1095,42 +1308,39 @@ got_record:;
goto next_record_is_invalid;
}
subrecord = (XLogSubRecord *) ((char *) readBuf + SizeOfXLogPHD);
if (subrecord->xl_len == 0 || subrecord->xl_len >
(BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord))
if (subrecord->xl_len == 0 ||
record->xl_len < (subrecord->xl_len + gotlen))
{
elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
subrecord->xl_len, readId, readSeg, readOff);
goto next_record_is_invalid;
}
len += subrecord->xl_len;
if (len > MAXLOGRECSZ)
len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
if (subrecord->xl_len > len)
{
elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
len, RecPtr->xlogid, RecPtr->xrecoff);
memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, len);
gotlen += len;
buffer += len;
continue;
}
if (record->xl_len != (subrecord->xl_len + gotlen))
{
elog(emode, "ReadRecord: invalid len %u of constracted record in logfile %u seg %u off %u",
subrecord->xl_len + gotlen, readId, readSeg, readOff);
goto next_record_is_invalid;
}
memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
buffer += subrecord->xl_len;
if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
{
if (MAXALIGN(subrecord->xl_len) +
SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
{
elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
subrecord->xl_len, readId, readSeg, readOff);
goto next_record_is_invalid;
}
continue;
}
break;
}
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) +
SizeOfXLogPHD + SizeOfXLogSubRecord)
{
nextRecord = (XLogRecord *) ((char *) subrecord +
MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord);
}
record->xl_len = len;
EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ +
SizeOfXLogPHD + SizeOfXLogSubRecord +
@ -1138,6 +1348,8 @@ got_record:;
ReadRecPtr = *RecPtr;
return (record);
}
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) +
RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
nextRecord = (XLogRecord *) ((char *) record +
@ -1322,6 +1534,13 @@ WriteControlFile(void)
*/
if (sizeof(ControlFileData) > BLCKSZ)
elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c");
INIT_CRC64(ControlFile->crc);
COMP_CRC64(ControlFile->crc,
((char*)ControlFile + offsetof(ControlFileData, logId)),
(sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
FIN_CRC64(ControlFile->crc);
memset(buffer, 0, BLCKSZ);
memcpy(buffer, ControlFile, sizeof(ControlFileData));
@ -1342,6 +1561,7 @@ WriteControlFile(void)
static void
ReadControlFile(void)
{
crc64 crc;
int fd;
/*
@ -1356,6 +1576,15 @@ ReadControlFile(void)
close(fd);
INIT_CRC64(crc);
COMP_CRC64(crc,
((char*)ControlFile + offsetof(ControlFileData, logId)),
(sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
FIN_CRC64(crc);
if (crc.crc1 != ControlFile->crc.crc1 || crc.crc2 != ControlFile->crc.crc2)
elog(STOP, "Invalid CRC in control file");
/*
* Do compatibility checking immediately. We do this here for 2 reasons:
*
@ -1396,6 +1625,12 @@ UpdateControlFile(void)
{
int fd;
INIT_CRC64(ControlFile->crc);
COMP_CRC64(ControlFile->crc,
((char*)ControlFile + offsetof(ControlFileData, logId)),
(sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
FIN_CRC64(ControlFile->crc);
fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
elog(STOP, "open(\"%s\") failed: %m", ControlFilePath);
@ -1461,6 +1696,7 @@ BootStrapXLOG()
bool usexistent = false;
XLogPageHeader page = (XLogPageHeader) buffer;
XLogRecord *record;
crc64 crc;
checkPoint.redo.xlogid = 0;
checkPoint.redo.xrecoff = SizeOfXLogPHD;
@ -1487,6 +1723,13 @@ BootStrapXLOG()
record->xl_rmid = RM_XLOG_ID;
memcpy((char *) record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
INIT_CRC64(crc);
COMP_CRC64(crc, ((char*)&checkPoint), sizeof(checkPoint));
COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
(SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
FIN_CRC64(crc);
record->xl_crc = crc;
logFile = XLogFileInit(0, 0, &usexistent);
if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
@ -1532,7 +1775,7 @@ StartupXLOG()
XLogRecPtr RecPtr,
LastRec;
XLogRecord *record;
char buffer[MAXLOGRECSZ + SizeOfXLogRecord];
char buffer[_INTL_MAXLOGRECSZ + SizeOfXLogRecord];
elog(LOG, "starting up");
CritSectionCount++;
@ -1611,6 +1854,8 @@ StartupXLOG()
ShmemVariableCache->oidCount = 0;
ThisStartUpID = checkPoint.ThisStartUpID;
RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
XLogCtl->RedoRecPtr = checkPoint.redo;
if (XLByteLT(RecPtr, checkPoint.redo))
elog(STOP, "Invalid redo in checkPoint record");
@ -1648,8 +1893,7 @@ StartupXLOG()
/* Is REDO required ? */
if (XLByteLT(checkPoint.redo, RecPtr))
record = ReadRecord(&(checkPoint.redo), buffer);
else
/* read past CheckPoint record */
else /* read past CheckPoint record */
record = ReadRecord(NULL, buffer);
if (record->xl_len != 0)
@ -1676,6 +1920,9 @@ StartupXLOG()
write(2, buf, strlen(buf));
}
if (record->xl_info & (XLR_BKP_BLOCK_1|XLR_BKP_BLOCK_2))
RestoreBkpBlocks(record, EndRecPtr);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
record = ReadRecord(NULL, buffer);
} while (record->xl_len != 0);
@ -1758,13 +2005,31 @@ StartupXLOG()
}
/*
* Postmaster uses it to set ThisStartUpID from XLogCtlData
* located in shmem after successful startup.
* Postmaster uses it to set ThisStartUpID & RedoRecPtr from
* XLogCtlData located in shmem after successful startup.
*/
void
SetThisStartUpID(void)
{
ThisStartUpID = XLogCtl->ThisStartUpID;
RedoRecPtr = XLogCtl->RedoRecPtr;
}
/*
* CheckPoint-er called by postmaster creates copy of RedoRecPtr
* for postmaster in shmem. Postmaster uses GetRedoRecPtr after
* that to update its own copy of RedoRecPtr.
*/
void
SetRedoRecPtr(void)
{
XLogCtl->RedoRecPtr = RedoRecPtr;
}
void
GetRedoRecPtr(void)
{
RedoRecPtr = XLogCtl->RedoRecPtr;
}
/*
@ -1791,6 +2056,7 @@ CreateCheckPoint(bool shutdown)
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecData rdata;
uint32 freespace;
uint16 curridx;
uint32 _logId;
@ -1844,6 +2110,7 @@ CreateCheckPoint(bool shutdown)
checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
Insert->currpos - ((char *) Insert->currpage);
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
S_UNLOCK(&(XLogCtl->insert_lck));
SpinAcquire(XidGenLockId);
@ -1864,8 +2131,12 @@ CreateCheckPoint(bool shutdown)
if (shutdown && checkPoint.undo.xrecoff != 0)
elog(STOP, "Active transaction while data base is shutting down");
recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, (char *) &checkPoint,
sizeof(checkPoint), NULL, 0);
rdata.buffer = InvalidBuffer;
rdata.data = (char *)(&checkPoint);
rdata.len = sizeof(checkPoint);
rdata.next = NULL;
recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, &rdata);
if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
elog(STOP, "XLog concurrent activity while data base is shutting down");
@ -1941,10 +2212,14 @@ void XLogPutNextOid(Oid nextOid);
void
XLogPutNextOid(Oid nextOid)
{
(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID,
(char *) &nextOid, sizeof(Oid), NULL, 0);
}
XLogRecData rdata;
rdata.buffer = InvalidBuffer;
rdata.data = (char *)(&nextOid);
rdata.len = sizeof(Oid);
rdata.next = NULL;
(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
}
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)