1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

XLOG (also known as WAL -:)) Bootstrap/Startup/Shutdown.

First step in cleaning up backend initialization code.
Fix for FATAL: now FATAL is ERROR + exit.
This commit is contained in:
Vadim B. Mikheev
1999-10-06 21:58:18 +00:00
parent 9dcd8c528f
commit 4793740367
20 changed files with 1158 additions and 1015 deletions

View File

@ -14,12 +14,14 @@
void UpdateControlFile(void);
int XLOGShmemSize(void);
void XLOGShmemInit(void);
void BootStrapXLOG(void);
void StartupXLOG(void);
void ShutdownXLOG(void);
void CreateCheckPoint(bool shutdown);
char *XLogDir = NULL;
char *ControlFilePath = NULL;
char XLogDir[MAXPGPATH+1];
char ControlFilePath[MAXPGPATH+1];
uint32 XLOGbuffers = 0;
XLogRecPtr MyLastRecPtr = {0, 0};
bool StopIfError = false;
@ -81,7 +83,8 @@ static XLogCtlData *XLogCtl = NULL;
typedef enum DBState
{
DB_SHUTDOWNED = 1,
DB_STARTUP = 0,
DB_SHUTDOWNED,
DB_SHUTDOWNING,
DB_IN_RECOVERY,
DB_IN_PRODUCTION
@ -114,9 +117,9 @@ typedef struct CheckPoint
} CheckPoint;
/*
* We break each log file in 64Mb segments
* We break each log file in 16Mb segments
*/
#define XLogSegSize (64*1024*1024)
#define XLogSegSize (16*1024*1024)
#define XLogLastSeg (0xffffffff / XLogSegSize)
#define XLogFileSize (XLogLastSeg * XLogSegSize)
@ -166,6 +169,7 @@ static void XLogWrite(char *buffer);
static int XLogFileInit(uint32 log, uint32 seg);
static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
static char *str_time(time_t tnow);
static XLgwrResult LgwrResult = {{0, 0}, {0, 0}};
static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
@ -173,14 +177,14 @@ static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
static int logFile = -1;
static uint32 logId = 0;
static uint32 logSeg = 0;
static off_t logOff = 0;
static uint32 logOff = 0;
static XLogRecPtr ReadRecPtr;
static XLogRecPtr EndRecPtr;
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static off_t readOff = (off_t) -1;
static uint32 readOff = 0;
static char readBuf[BLCKSZ];
static XLogRecord *nextRecord = NULL;
@ -262,7 +266,13 @@ XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
freespace -= SizeOfXLogRecord;
record = (XLogRecord*) Insert->currpos;
record->xl_prev = Insert->PrevRecord;
record->xl_xact_prev = MyLastRecPtr;
if (rmid != RM_XLOG_ID)
record->xl_xact_prev = MyLastRecPtr;
else
{
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
}
record->xl_xid = GetCurrentTransactionId();
record->xl_len = (len > freespace) ? freespace : len;
record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0;
@ -271,7 +281,7 @@ XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
RecPtr.xrecoff =
XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
Insert->currpos - ((char*) Insert->currpage);
if (MyLastRecPtr.xrecoff == 0)
if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID)
{
SpinAcquire(SInvalLock);
MyProc->logRec = RecPtr;
@ -489,7 +499,7 @@ XLogFlush(XLogRecPtr record)
{
logId = LgwrResult.Write.xlogid;
logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
logOff = (off_t) 0;
logOff = 0;
logFile = XLogFileOpen(logId, logSeg, false);
}
@ -612,7 +622,7 @@ XLogWrite(char *buffer)
}
logId = LgwrResult.Write.xlogid;
logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
logOff = (off_t) 0;
logOff = 0;
logFile = XLogFileInit(logId, logSeg);
SpinAcquire(ControlFileLockId);
ControlFile->logId = logId;
@ -626,14 +636,14 @@ XLogWrite(char *buffer)
{
logId = LgwrResult.Write.xlogid;
logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
logOff = (off_t) 0;
logOff = 0;
logFile = XLogFileOpen(logId, logSeg, false);
}
if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
{
logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
if (lseek(logFile, logOff, SEEK_SET) < 0)
if (lseek(logFile, (off_t)logOff, SEEK_SET) < 0)
elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d",
logId, logSeg, logOff, errno);
}
@ -717,6 +727,10 @@ tryAgain:
elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
logId, logSeg, errno);
if (lseek(fd, 0, SEEK_SET) < 0)
elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d",
log, seg, 0, errno);
return(fd);
}
@ -753,327 +767,6 @@ tryAgain:
return(fd);
}
void
UpdateControlFile()
{
int fd;
tryAgain:
fd = open(ControlFilePath, O_RDWR);
if (fd < 0 && (errno == EMFILE || errno == ENFILE))
{
fd = errno;
if (!ReleaseDataFile())
elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
fd);
goto tryAgain;
}
if (fd < 0)
elog(STOP, "Open(cntlfile) failed: %d", errno);
if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
elog(STOP, "Write(cntlfile) failed: %d", errno);
if (fsync(fd) != 0)
elog(STOP, "Fsync(cntlfile) failed: %d", errno);
close(fd);
return;
}
int
XLOGShmemSize()
{
if (XLOGbuffers < MinXLOGbuffers)
XLOGbuffers = MinXLOGbuffers;
return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
}
/*
* This func must be called ONCE on system install
*/
void
BootStrapXLOG()
{
int fd;
char buffer[BLCKSZ];
XLogPageHeader page = (XLogPageHeader)buffer;
CheckPoint checkPoint;
XLogRecord *record;
fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
if (fd < 0)
elog(STOP, "BootStrapXLOG failed to create control file: %d", errno);
logFile = XLogFileInit(0, 0);
checkPoint.redo.xlogid = 0;
checkPoint.redo.xrecoff = SizeOfXLogPHD;
checkPoint.undo = checkPoint.redo;
checkPoint.nextXid = FirstTransactionId;
checkPoint.nextOid = BootstrapObjectIdData;
memset(buffer, 0, BLCKSZ);
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
record->xl_xact_prev = record->xl_prev;
record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(checkPoint);
record->xl_info = 0;
record->xl_rmid = RM_XLOG_ID;
memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
if (fsync(logFile) != 0)
elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
close(logFile);
logFile = -1;
memset(buffer, 0, BLCKSZ);
ControlFile = (ControlFileData*) buffer;
ControlFile->logId = 0;
ControlFile->logSeg = 1;
ControlFile->checkPoint = checkPoint.redo;
ControlFile->time = time(NULL);
ControlFile->state = DB_SHUTDOWNED;
if (write(fd, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
if (fsync(fd) != 0)
elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
close(fd);
return;
}
/*
* This func must be called ONCE on system startup
*/
void
StartupXLOG()
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
CheckPoint checkPoint;
XLogRecPtr RecPtr,
LastRec;
XLogRecord *record;
char buffer[MAXLOGRECSZ+SizeOfXLogRecord];
int fd;
bool found;
bool recovery = false;
bool sie_saved = false;
elog(LOG, "Starting up XLOG manager...");
if (XLOGbuffers < MinXLOGbuffers)
XLOGbuffers = MinXLOGbuffers;
ControlFile = (ControlFileData*)
ShmemInitStruct("Control File", BLCKSZ, &found);
Assert(!found);
XLogCtl = (XLogCtlData*)
ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
sizeof(XLogRecPtr) * XLOGbuffers, &found);
Assert(!found);
XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
XLogCtl->LgwrRqst = LgwrRqst;
XLogCtl->LgwrResult = LgwrResult;
XLogCtl->Insert.LgwrResult = LgwrResult;
XLogCtl->Insert.curridx = 0;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
XLogCtl->Write.LgwrResult = LgwrResult;
XLogCtl->Write.curridx = 0;
/*
* Open/read Control file
*/
tryAgain:
fd = open(ControlFilePath, O_RDWR);
if (fd < 0 && (errno == EMFILE || errno == ENFILE))
{
fd = errno;
if (!ReleaseDataFile())
elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
fd);
goto tryAgain;
}
if (fd < 0)
elog(STOP, "Open(cntlfile) failed: %d", errno);
if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
elog(STOP, "Read(cntlfile) failed: %d", errno);
close(fd);
if (ControlFile->logSeg == 0 ||
ControlFile->time <= 0 ||
ControlFile->state < DB_SHUTDOWNED ||
ControlFile->state > DB_IN_PRODUCTION ||
ControlFile->checkPoint.xlogid == 0 ||
ControlFile->checkPoint.xrecoff == 0)
elog(STOP, "Control file context is broken");
if (ControlFile->state == DB_SHUTDOWNED)
elog(LOG, "Data Base System was properly shutdowned at %s",
ctime(&(ControlFile->time)));
else if (ControlFile->state == DB_SHUTDOWNING)
elog(LOG, "Data Base System was interrupted while shutting down at %s",
ctime(&(ControlFile->time)));
else if (ControlFile->state == DB_IN_RECOVERY)
{
elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
"This propably means that some data blocks are corrupted\n"
"And you will have to use last backup for recovery",
ctime(&(ControlFile->time)));
}
else if (ControlFile->state == DB_IN_PRODUCTION)
elog(LOG, "Data Base System was interrupted being in production at %s",
ctime(&(ControlFile->time)));
LastRec = RecPtr = ControlFile->checkPoint;
if (!XRecOffIsValid(RecPtr.xrecoff))
elog(STOP, "Invalid checkPoint in control file");
elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
record = ReadRecord(&RecPtr, buffer);
if (record->xl_rmid != RM_XLOG_ID)
elog(STOP, "Invalid RMID in checkPoint record");
if (record->xl_len != sizeof(checkPoint))
elog(STOP, "Invalid length of checkPoint record");
checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
elog(LOG, "NextTransactionId: %u; NextOid: %u)",
checkPoint.nextXid, checkPoint.nextOid);
if (checkPoint.nextXid < FirstTransactionId ||
checkPoint.nextOid < BootstrapObjectIdData)
elog(LOG, "Invalid NextTransactionId/NextOid");
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
if (XLByteLT(RecPtr, checkPoint.redo))
elog(STOP, "Invalid redo in checkPoint record");
if (checkPoint.undo.xrecoff == 0)
checkPoint.undo = RecPtr;
if (XLByteLT(RecPtr, checkPoint.undo))
elog(STOP, "Invalid undo in checkPoint record");
if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
{
if (ControlFile->state == DB_SHUTDOWNED)
elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
recovery = true;
}
else if (ControlFile->state != DB_SHUTDOWNED)
recovery = true;
if (recovery)
{
elog(LOG, "The DataBase system was not properly shutdowned\n"
"Automatic recovery is in progress...");
ControlFile->state = DB_IN_RECOVERY;
ControlFile->time = time(NULL);
UpdateControlFile();
sie_saved = StopIfError;
StopIfError = true;
/* Is REDO required ? */
if (XLByteLT(checkPoint.redo, RecPtr))
record = ReadRecord(&(checkPoint.redo), buffer);
else /* read past CheckPoint record */
record = ReadRecord(NULL, buffer);
/* REDO */
if (record->xl_len != 0)
{
elog(LOG, "Redo starts at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
do
{
if (record->xl_xid >= ShmemVariableCache->nextXid)
ShmemVariableCache->nextXid = record->xl_xid + 1;
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
record = ReadRecord(NULL, buffer);
} while (record->xl_len != 0);
elog(LOG, "Redo done at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
LastRec = ReadRecPtr;
}
else
elog(LOG, "Redo is not required");
/* UNDO */
RecPtr = ReadRecPtr;
if (XLByteLT(checkPoint.undo, RecPtr))
{
elog(LOG, "Undo starts at (%u, %u)",
RecPtr.xlogid, RecPtr.xrecoff);
do
{
record = ReadRecord(&RecPtr, buffer);
if (TransactionIdIsValid(record->xl_xid) &&
!TransactionIdDidCommit(record->xl_xid))
RmgrTable[record->xl_rmid].rm_undo(record);
RecPtr = record->xl_prev;
} while (XLByteLE(checkPoint.undo, RecPtr));
elog(LOG, "Undo done at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
}
else
elog(LOG, "Undo is not required");
}
/* Init xlog buffer cache */
record = ReadRecord(&LastRec, buffer);
logId = EndRecPtr.xlogid;
logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
logOff = 0;
logFile = XLogFileOpen(logId, logSeg, false);
XLogCtl->xlblocks[0].xlogid = logId;
XLogCtl->xlblocks[0].xrecoff =
((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
Insert->currpos = ((char*) Insert->currpage) +
(EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
if (recovery)
{
int i;
/*
* Let resource managers know that recovery is done
*/
for (i = 0; i <= RM_MAX_ID; i++)
RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
CreateCheckPoint(true);
StopIfError = sie_saved;
}
ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = time(NULL);
UpdateControlFile();
return;
}
static XLogRecord*
ReadRecord(XLogRecPtr *RecPtr, char *buffer)
{
@ -1081,6 +774,7 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
XLogRecPtr tmpRecPtr = EndRecPtr;
bool nextmode = (RecPtr == NULL);
int emode = (nextmode) ? LOG : STOP;
bool noBlck = false;
if (nextmode)
{
@ -1113,16 +807,16 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
readSeg = RecPtr->xrecoff / XLogSegSize;
if (readFile < 0)
{
readOff = (off_t) -1;
noBlck = true;
readFile = XLogFileOpen(readId, readSeg, nextmode);
if (readFile < 0)
goto next_record_is_invalid;
}
if (readOff < 0 || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
{
readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
readId, readSeg, readOff, errno);
if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
@ -1186,7 +880,7 @@ got_record:;
readId++;
}
close(readFile);
readOff = (off_t) 0;
readOff = 0;
readFile = XLogFileOpen(readId, readSeg, nextmode);
if (readFile < 0)
goto next_record_is_invalid;
@ -1280,7 +974,7 @@ next_record_is_invalid:;
elog(LOG, "Formating logfile %u seg %u block %u at offset %u",
readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
readFile = XLogFileOpen(readId, readSeg, false);
if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
readId, readSeg, readOff, errno);
if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
@ -1288,7 +982,7 @@ next_record_is_invalid:;
readId, readSeg, readOff, errno);
memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0,
BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
if (lseek(readFile, readOff * BLCKSZ, SEEK_SET) < 0)
if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
readId, readSeg, readOff, errno);
if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
@ -1303,15 +997,17 @@ next_record_is_invalid:;
readId = tmpRecPtr.xlogid;
readSeg = tmpRecPtr.xrecoff / XLogSegSize;
readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
Assert(readOff > 0);
}
if (readOff > 0)
{
elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
readId, readSeg, readOff);
if (!XLByteEQ(tmpRecPtr, EndRecPtr))
elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
readId, readSeg, readOff);
readOff *= BLCKSZ;
memset(readBuf, 0, BLCKSZ);
readFile = XLogFileOpen(readId, readSeg, false);
if (lseek(readFile, readOff, SEEK_SET) < 0)
if (lseek(readFile, (off_t)readOff, SEEK_SET) < 0)
elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
readId, readSeg, readOff, errno);
while (readOff < XLogSegSize)
@ -1357,6 +1053,381 @@ next_record_is_invalid:;
return(record);
}
void
UpdateControlFile()
{
int fd;
tryAgain:
fd = open(ControlFilePath, O_RDWR);
if (fd < 0 && (errno == EMFILE || errno == ENFILE))
{
fd = errno;
if (!ReleaseDataFile())
elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
fd);
goto tryAgain;
}
if (fd < 0)
elog(STOP, "Open(cntlfile) failed: %d", errno);
if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
elog(STOP, "Write(cntlfile) failed: %d", errno);
if (fsync(fd) != 0)
elog(STOP, "Fsync(cntlfile) failed: %d", errno);
close(fd);
return;
}
int
XLOGShmemSize()
{
if (XLOGbuffers < MinXLOGbuffers)
XLOGbuffers = MinXLOGbuffers;
return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
}
void
XLOGShmemInit(void)
{
bool found;
if (XLOGbuffers < MinXLOGbuffers)
XLOGbuffers = MinXLOGbuffers;
ControlFile = (ControlFileData*)
ShmemInitStruct("Control File", BLCKSZ, &found);
Assert(!found);
XLogCtl = (XLogCtlData*)
ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
sizeof(XLogRecPtr) * XLOGbuffers, &found);
Assert(!found);
}
/*
* This func must be called ONCE on system install
*/
void
BootStrapXLOG()
{
int fd;
char buffer[BLCKSZ];
XLogPageHeader page = (XLogPageHeader)buffer;
CheckPoint checkPoint;
XLogRecord *record;
fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
if (fd < 0)
elog(STOP, "BootStrapXLOG failed to create control file (%s): %d",
ControlFilePath, errno);
logFile = XLogFileInit(0, 0);
checkPoint.redo.xlogid = 0;
checkPoint.redo.xrecoff = SizeOfXLogPHD;
checkPoint.undo = checkPoint.redo;
checkPoint.nextXid = FirstTransactionId;
checkPoint.nextOid = BootstrapObjectIdData;
memset(buffer, 0, BLCKSZ);
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
record->xl_xact_prev = record->xl_prev;
record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(checkPoint);
record->xl_info = 0;
record->xl_rmid = RM_XLOG_ID;
memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
if (fsync(logFile) != 0)
elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
close(logFile);
logFile = -1;
memset(buffer, 0, BLCKSZ);
ControlFile = (ControlFileData*) buffer;
ControlFile->logId = 0;
ControlFile->logSeg = 1;
ControlFile->checkPoint = checkPoint.redo;
ControlFile->time = time(NULL);
ControlFile->state = DB_SHUTDOWNED;
if (write(fd, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
if (fsync(fd) != 0)
elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
close(fd);
return;
}
static char*
str_time(time_t tnow)
{
char *result = ctime(&tnow);
char *p = strchr(result, '\n');
if (p != NULL)
*p = 0;
return(result);
}
/*
* This func must be called ONCE on system startup
*/
void
StartupXLOG()
{
XLogCtlInsert *Insert;
CheckPoint checkPoint;
XLogRecPtr RecPtr,
LastRec;
XLogRecord *record;
char buffer[MAXLOGRECSZ+SizeOfXLogRecord];
int fd;
int recovery = 0;
bool sie_saved = false;
elog(LOG, "Data Base System is starting up at %s", str_time(time(NULL)));
XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
XLogCtl->LgwrRqst = LgwrRqst;
XLogCtl->LgwrResult = LgwrResult;
XLogCtl->Insert.LgwrResult = LgwrResult;
XLogCtl->Insert.curridx = 0;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
XLogCtl->Write.LgwrResult = LgwrResult;
XLogCtl->Write.curridx = 0;
S_INIT_LOCK(&(XLogCtl->insert_lck));
S_INIT_LOCK(&(XLogCtl->info_lck));
S_INIT_LOCK(&(XLogCtl->lgwr_lck));
/*
* Open/read Control file
*/
tryAgain:
fd = open(ControlFilePath, O_RDWR);
if (fd < 0 && (errno == EMFILE || errno == ENFILE))
{
fd = errno;
if (!ReleaseDataFile())
elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
fd);
goto tryAgain;
}
if (fd < 0)
elog(STOP, "Open(cntlfile) failed: %d", errno);
if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
elog(STOP, "Read(cntlfile) failed: %d", errno);
close(fd);
if (ControlFile->logSeg == 0 ||
ControlFile->time <= 0 ||
ControlFile->state < DB_SHUTDOWNED ||
ControlFile->state > DB_IN_PRODUCTION ||
!XRecOffIsValid(ControlFile->checkPoint.xrecoff))
elog(STOP, "Control file context is broken");
if (ControlFile->state == DB_SHUTDOWNED)
elog(LOG, "Data Base System was shutdowned at %s",
str_time(ControlFile->time));
else if (ControlFile->state == DB_SHUTDOWNING)
elog(LOG, "Data Base System was interrupted when shutting down at %s",
str_time(ControlFile->time));
else if (ControlFile->state == DB_IN_RECOVERY)
{
elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
"\tThis propably means that some data blocks are corrupted\n"
"\tAnd you will have to use last backup for recovery",
str_time(ControlFile->time));
}
else if (ControlFile->state == DB_IN_PRODUCTION)
elog(LOG, "Data Base System was interrupted being in production at %s",
str_time(ControlFile->time));
LastRec = RecPtr = ControlFile->checkPoint;
if (!XRecOffIsValid(RecPtr.xrecoff))
elog(STOP, "Invalid checkPoint in control file");
elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
record = ReadRecord(&RecPtr, buffer);
if (record->xl_rmid != RM_XLOG_ID)
elog(STOP, "Invalid RMID in checkPoint record");
if (record->xl_len != sizeof(checkPoint))
elog(STOP, "Invalid length of checkPoint record");
checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
elog(LOG, "NextTransactionId: %u; NextOid: %u",
checkPoint.nextXid, checkPoint.nextOid);
if (checkPoint.nextXid < FirstTransactionId ||
checkPoint.nextOid < BootstrapObjectIdData)
#ifdef XLOG
elog(STOP, "Invalid NextTransactionId/NextOid");
#else
elog(LOG, "Invalid NextTransactionId/NextOid");
#endif
#ifdef XLOG
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
#endif
if (XLByteLT(RecPtr, checkPoint.redo))
elog(STOP, "Invalid redo in checkPoint record");
if (checkPoint.undo.xrecoff == 0)
checkPoint.undo = RecPtr;
if (XLByteLT(RecPtr, checkPoint.undo))
elog(STOP, "Invalid undo in checkPoint record");
if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
{
if (ControlFile->state == DB_SHUTDOWNED)
elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
recovery = 2;
}
else if (ControlFile->state != DB_SHUTDOWNED)
recovery = 2;
if (recovery > 0)
{
elog(LOG, "The DataBase system was not properly shutdowned\n"
"\tAutomatic recovery is in progress...");
ControlFile->state = DB_IN_RECOVERY;
ControlFile->time = time(NULL);
UpdateControlFile();
sie_saved = StopIfError;
StopIfError = true;
/* Is REDO required ? */
if (XLByteLT(checkPoint.redo, RecPtr))
record = ReadRecord(&(checkPoint.redo), buffer);
else /* read past CheckPoint record */
record = ReadRecord(NULL, buffer);
/* REDO */
if (record->xl_len != 0)
{
elog(LOG, "Redo starts at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
do
{
#ifdef XLOG
if (record->xl_xid >= ShmemVariableCache->nextXid)
ShmemVariableCache->nextXid = record->xl_xid + 1;
#endif
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
record = ReadRecord(NULL, buffer);
} while (record->xl_len != 0);
elog(LOG, "Redo done at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
LastRec = ReadRecPtr;
}
else
{
elog(LOG, "Redo is not required");
recovery--;
}
/* UNDO */
RecPtr = ReadRecPtr;
if (XLByteLT(checkPoint.undo, RecPtr))
{
elog(LOG, "Undo starts at (%u, %u)",
RecPtr.xlogid, RecPtr.xrecoff);
do
{
record = ReadRecord(&RecPtr, buffer);
if (TransactionIdIsValid(record->xl_xid) &&
!TransactionIdDidCommit(record->xl_xid))
RmgrTable[record->xl_rmid].rm_undo(record);
RecPtr = record->xl_prev;
} while (XLByteLE(checkPoint.undo, RecPtr));
elog(LOG, "Undo done at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
}
else
{
elog(LOG, "Undo is not required");
recovery--;
}
}
/* Init xlog buffer cache */
record = ReadRecord(&LastRec, buffer);
logId = EndRecPtr.xlogid;
logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
logOff = 0;
logFile = XLogFileOpen(logId, logSeg, false);
XLogCtl->xlblocks[0].xlogid = logId;
XLogCtl->xlblocks[0].xrecoff =
((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
Insert = &XLogCtl->Insert;
memcpy((char*)(Insert->currpage), readBuf, BLCKSZ);
Insert->currpos = ((char*) Insert->currpage) +
(EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
Insert->PrevRecord = ControlFile->checkPoint;
if (recovery > 0)
{
int i;
/*
* Let resource managers know that recovery is done
*/
for (i = 0; i <= RM_MAX_ID; i++)
RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
CreateCheckPoint(true);
StopIfError = sie_saved;
}
ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = time(NULL);
UpdateControlFile();
elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL)));
return;
}
/*
* This func must be called ONCE on system shutdown
*/
void
ShutdownXLOG()
{
elog(LOG, "Data Base System is shutting down at %s", str_time(time(NULL)));
CreateCheckPoint(true);
elog(LOG, "Data Base System is shutdowned at %s", str_time(time(NULL)));
}
void
CreateCheckPoint(bool shutdown)
{
@ -1375,7 +1446,7 @@ CreateCheckPoint(bool shutdown)
}
/* Get REDO record ptr */
while (!TAS(&(XLogCtl->insert_lck)))
while (TAS(&(XLogCtl->insert_lck)))
{
struct timeval delay = {0, 5000};
@ -1410,6 +1481,7 @@ CreateCheckPoint(bool shutdown)
FlushBufferPool();
/* Get UNDO record ptr */
checkPoint.undo.xrecoff = 0;
if (shutdown && checkPoint.undo.xrecoff != 0)
elog(STOP, "Active transaction while data base is shutting down");