1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Rearrange pg_subtrans handling as per recent discussion. pg_subtrans

updates are no longer WAL-logged nor even fsync'd; we do not need to,
since after a crash no old pg_subtrans data is needed again.  We truncate
pg_subtrans to RecentGlobalXmin at each checkpoint.  slru.c's API is
refactored a little bit to separate out the necessary decisions.
This commit is contained in:
Tom Lane
2004-08-23 23:22:45 +00:00
parent 059912ce2e
commit 4dbb880d3c
13 changed files with 387 additions and 436 deletions

View File

@ -10,29 +10,34 @@
* looked up again. Now we use specialized access code so that the commit
* log can be broken into relatively small, independent segments.
*
* XLOG interactions: this module generates an XLOG record whenever a new
* CLOG page is initialized to zeroes. Other writes of CLOG come from
* recording of transaction commit or abort in xact.c, which generates its
* own XLOG records for these events and will re-perform the status update
* on redo; so we need make no additional XLOG entry here. Also, the XLOG
* is guaranteed flushed through the XLOG commit record before we are called
* to log a commit, so the WAL rule "write xlog before data" is satisfied
* automatically for commits, and we don't really care for aborts. Therefore,
* we don't need to mark CLOG pages with LSN information; we have enough
* synchronization already.
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.22 2004/07/03 02:55:56 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/clog.c,v 1.23 2004/08/23 23:22:44 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <fcntl.h>
#include <dirent.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/clog.h"
#include "access/slru.h"
#include "miscadmin.h"
#include "storage/lwlock.h"
#include "postmaster/bgwriter.h"
/*
* Defines for CLOG page and segment sizes. A page is the same BLCKSZ
* as is used everywhere else in Postgres.
* Defines for CLOG page sizes. A page is the same BLCKSZ as is used
* everywhere else in Postgres.
*
* Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
* CLOG page numbering also wraps around at 0xFFFFFFFF/CLOG_XACTS_PER_PAGE,
@ -53,25 +58,11 @@
#define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
/*----------
* Shared-memory data structures for CLOG control
*
* XLOG interactions: this module generates an XLOG record whenever a new
* CLOG page is initialized to zeroes. Other writes of CLOG come from
* recording of transaction commit or abort in xact.c, which generates its
* own XLOG records for these events and will re-perform the status update
* on redo; so we need make no additional XLOG entry here. Also, the XLOG
* is guaranteed flushed through the XLOG commit record before we are called
* to log a commit, so the WAL rule "write xlog before data" is satisfied
* automatically for commits, and we don't really care for aborts. Therefore,
* we don't need to mark CLOG pages with LSN information; we have enough
* synchronization already.
*----------
/*
* Link to shared-memory data structures for CLOG control
*/
static SlruCtlData ClogCtlData;
static SlruCtl ClogCtl = &ClogCtlData;
#define ClogCtl (&ClogCtlData)
static int ZeroCLOGPage(int pageno, bool writeXlog);
@ -91,6 +82,7 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno;
char *byteptr;
char byteval;
@ -98,10 +90,10 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
status == TRANSACTION_STATUS_ABORTED ||
status == TRANSACTION_STATUS_SUB_COMMITTED);
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
byteptr = SimpleLruReadPage(ClogCtl, pageno, xid, true);
byteptr += byteno;
slotno = SimpleLruReadPage(ClogCtl, pageno, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
/* Current state should be 0, subcommitted or target state */
Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 ||
@ -114,9 +106,9 @@ TransactionIdSetStatus(TransactionId xid, XidStatus status)
byteval |= (status << bshift);
*byteptr = byteval;
/* ...->page_status[slotno] = SLRU_PAGE_DIRTY; already done */
ClogCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;
LWLockRelease(ClogCtl->ControlLock);
LWLockRelease(CLogControlLock);
}
/*
@ -131,17 +123,18 @@ TransactionIdGetStatus(TransactionId xid)
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno;
char *byteptr;
XidStatus status;
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
byteptr = SimpleLruReadPage(ClogCtl, pageno, xid, false);
byteptr += byteno;
slotno = SimpleLruReadPage(ClogCtl, pageno, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
LWLockRelease(ClogCtl->ControlLock);
LWLockRelease(CLogControlLock);
return status;
}
@ -160,8 +153,8 @@ CLOGShmemSize(void)
void
CLOGShmemInit(void)
{
SimpleLruInit(ClogCtl, "CLOG Ctl", "pg_clog");
ClogCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(ClogCtl, "CLOG Ctl", CLogControlLock, "pg_clog");
}
/*
@ -175,16 +168,16 @@ BootStrapCLOG(void)
{
int slotno;
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
/* Create and zero the first page of the commit log */
slotno = ZeroCLOGPage(0, false);
/* Make sure it's written out */
SimpleLruWritePage(ClogCtl, slotno, NULL);
/* Assert(ClogCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
Assert(ClogCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);
LWLockRelease(ClogCtl->ControlLock);
LWLockRelease(CLogControlLock);
}
/*
@ -199,7 +192,9 @@ BootStrapCLOG(void)
static int
ZeroCLOGPage(int pageno, bool writeXlog)
{
int slotno = SimpleLruZeroPage(ClogCtl, pageno);
int slotno;
slotno = SimpleLruZeroPage(ClogCtl, pageno);
if (writeXlog)
WriteZeroPageXlogRec(pageno);
@ -217,8 +212,7 @@ StartupCLOG(void)
/*
* Initialize our idea of the latest page number.
*/
SimpleLruSetLatestPage(ClogCtl,
TransactionIdToPage(ShmemVariableCache->nextXid));
ClogCtl->shared->latest_page_number = TransactionIdToPage(ShmemVariableCache->nextXid);
}
/*
@ -227,6 +221,7 @@ StartupCLOG(void)
void
ShutdownCLOG(void)
{
/* Flush dirty CLOG pages to disk */
SimpleLruFlush(ClogCtl, false);
}
@ -236,6 +231,7 @@ ShutdownCLOG(void)
void
CheckPointCLOG(void)
{
/* Flush dirty CLOG pages to disk */
SimpleLruFlush(ClogCtl, true);
}
@ -263,12 +259,12 @@ ExtendCLOG(TransactionId newestXact)
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
LWLockRelease(ClogCtl->ControlLock);
LWLockRelease(CLogControlLock);
}
@ -296,6 +292,15 @@ TruncateCLOG(TransactionId oldestXact)
* We pass the *page* containing oldestXact to SimpleLruTruncate.
*/
cutoffPage = TransactionIdToPage(oldestXact);
/* Check to see if there's any files that could be removed */
if (!SlruScanDirectory(ClogCtl, cutoffPage, false))
return; /* nothing to remove */
/* Perform a CHECKPOINT */
RequestCheckpoint(true);
/* Now we can remove the old CLOG segment(s) */
SimpleLruTruncate(ClogCtl, cutoffPage);
}
@ -340,20 +345,51 @@ WriteZeroPageXlogRec(int pageno)
rdata.data = (char *) (&pageno);
rdata.len = sizeof(int);
rdata.next = NULL;
(void) XLogInsert(RM_SLRU_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
(void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE | XLOG_NO_TRAN, &rdata);
}
/* Redo a ZEROPAGE action during WAL replay */
/*
* CLOG resource manager's routines
*/
void
clog_zeropage_redo(int pageno)
clog_redo(XLogRecPtr lsn, XLogRecord *record)
{
int slotno;
uint8 info = record->xl_info & ~XLR_INFO_MASK;
LWLockAcquire(ClogCtl->ControlLock, LW_EXCLUSIVE);
if (info == CLOG_ZEROPAGE)
{
int pageno;
int slotno;
slotno = ZeroCLOGPage(pageno, false);
SimpleLruWritePage(ClogCtl, slotno, NULL);
/* Assert(ClogCtl->page_status[slotno] == SLRU_PAGE_CLEAN); */
memcpy(&pageno, XLogRecGetData(record), sizeof(int));
LWLockRelease(ClogCtl->ControlLock);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
slotno = ZeroCLOGPage(pageno, false);
SimpleLruWritePage(ClogCtl, slotno, NULL);
Assert(ClogCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);
LWLockRelease(CLogControlLock);
}
}
void
clog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void
clog_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == CLOG_ZEROPAGE)
{
int pageno;
memcpy(&pageno, rec, sizeof(int));
sprintf(buf + strlen(buf), "zeropage: %d", pageno);
}
else
strcat(buf, "UNKNOWN");
}