1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Support an optional asynchronous commit mode, in which we don't flush WAL

before reporting a transaction committed.  Data consistency is still
guaranteed (unlike setting fsync = off), but a crash may lose the effects
of the last few transactions.  Patch by Simon, some editorialization by Tom.
This commit is contained in:
Tom Lane
2007-08-01 22:45:09 +00:00
parent c722628a43
commit 4a78cdeb6b
25 changed files with 998 additions and 303 deletions

View File

@@ -41,7 +41,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.40 2007/01/05 22:19:23 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.41 2007/08/01 22:45:07 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -140,6 +140,8 @@ static SlruErrorCause slru_errcause;
static int slru_errno;
static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
SlruFlush fdata);
@@ -152,7 +154,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
*/
Size
SimpleLruShmemSize(int nslots)
SimpleLruShmemSize(int nslots, int nlsns)
{
Size sz;
@@ -165,18 +167,21 @@ SimpleLruShmemSize(int nslots)
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockId)); /* buffer_locks[] */
if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
return BUFFERALIGN(sz) + BLCKSZ * nslots;
}
void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLockId ctllock, const char *subdir)
{
SlruShared shared;
bool found;
shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots),
SimpleLruShmemSize(nslots, nlsns),
&found);
if (!IsUnderPostmaster)
@@ -193,6 +198,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
shared->ControlLock = ctllock;
shared->num_slots = nslots;
shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0;
@@ -212,8 +218,14 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots,
offset += MAXALIGN(nslots * sizeof(int));
shared->buffer_locks = (LWLockId *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLockId));
ptr += BUFFERALIGN(offset);
if (nlsns > 0)
{
shared->group_lsn = (XLogRecPtr *) (ptr + offset);
offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
}
ptr += BUFFERALIGN(offset);
for (slotno = 0; slotno < nslots; slotno++)
{
shared->page_buffer[slotno] = ptr;
@@ -266,15 +278,37 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
/* Set the buffer to zeroes */
MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
/* Set the LSNs for this new page to zero */
SimpleLruZeroLSNs(ctl, slotno);
/* Assume this page is now the latest active page */
shared->latest_page_number = pageno;
return slotno;
}
/*
* Zero all the LSNs we store for this slru page.
*
* This should be called each time we create a new page, and each time we read
* in a page from disk into an existing buffer. (Such an old page cannot
* have any interesting LSNs, since we'd have flushed them before writing
* the page in the first place.)
*/
static void
SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
{
SlruShared shared = ctl->shared;
if (shared->lsn_groups_per_page > 0)
MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
shared->lsn_groups_per_page * sizeof(XLogRecPtr));
}
/*
* Wait for any active I/O on a page slot to finish. (This does not
* guarantee that new I/O hasn't been started before we return, though.)
* guarantee that new I/O hasn't been started before we return, though.
* In fact the slot might not even contain the same page anymore.)
*
* Control lock must be held at entry, and will be held at exit.
*/
@@ -305,8 +339,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
/* indeed, the I/O must have failed */
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
else
/* write_in_progress */
else /* write_in_progress */
{
shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true;
@@ -320,6 +353,11 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Find a page in a shared buffer, reading it in if necessary.
* The page number must correspond to an already-initialized page.
*
* If write_ok is true then it is OK to return a page that is in
* WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
* that modification of the page is safe. If write_ok is false then we
* will not return the page until it is not undergoing active I/O.
*
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
*
@@ -329,7 +367,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Control lock must be held at entry, and will be held at exit.
*/
int
SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
TransactionId xid)
{
SlruShared shared = ctl->shared;
@@ -346,8 +385,13 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY)
{
/* If page is still being read in, we must wait for I/O */
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
/*
* If page is still being read in, we must wait for I/O. Likewise
* if the page is being written and the caller said that's not OK.
*/
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
!write_ok))
{
SimpleLruWaitIO(ctl, slotno);
/* Now we must recheck state from the top */
@@ -383,6 +427,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid)
/* Do the read */
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
/* Set the LSNs for this newly read-in page to zero */
SimpleLruZeroLSNs(ctl, slotno);
/* Re-acquire control lock and update page state */
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
@@ -443,7 +490,7 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
return SimpleLruReadPage(ctl, pageno, xid);
return SimpleLruReadPage(ctl, pageno, true, xid);
}
/*
@@ -621,6 +668,47 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
char path[MAXPGPATH];
int fd = -1;
/*
* Honor the write-WAL-before-data rule, if appropriate, so that we do
* not write out data before associated WAL records. This is the same
* action performed during FlushBuffer() in the main buffer manager.
*/
if (shared->group_lsn != NULL)
{
/*
* We must determine the largest async-commit LSN for the page.
* This is a bit tedious, but since this entire function is a slow
* path anyway, it seems better to do this here than to maintain
* a per-page LSN variable (which'd need an extra comparison in the
* transaction-commit path).
*/
XLogRecPtr max_lsn;
int lsnindex, lsnoff;
lsnindex = slotno * shared->lsn_groups_per_page;
max_lsn = shared->group_lsn[lsnindex++];
for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
{
XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
if (XLByteLT(max_lsn, this_lsn))
max_lsn = this_lsn;
}
if (!XLogRecPtrIsInvalid(max_lsn))
{
/*
* As noted above, elog(ERROR) is not acceptable here, so if
* XLogFlush were to fail, we must PANIC. This isn't much of
* a restriction because XLogFlush is just about all critical
* section anyway, but let's make sure.
*/
START_CRIT_SECTION();
XLogFlush(max_lsn);
END_CRIT_SECTION();
}
}
/*
* During a Flush, we may already have the desired file open.
*/