1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Per previous discussions, get rid of use of sync(2) in favor of

explicitly fsync'ing every (non-temp) file we have written since the
last checkpoint.  In the vast majority of cases, the burden of the
fsyncs should fall on the bgwriter process not on backends.  (To this
end, we assume that an fsync issued by the bgwriter will force out
blocks written to the same file by other processes using other file
descriptors.  Anyone have a problem with that?)  This makes the world
safe for WIN32, which ain't even got sync(2), and really makes the world
safe for Unixen as well, because sync(2) never had the semantics we need:
it offers no way to wait for the requested I/O to finish.

Along the way, fix a bug I recently introduced in xlog recovery:
file truncation replay failed to clear bufmgr buffers for the dropped
blocks, which could result in 'PANIC:  heap_delete_redo: no block'
later on in xlog replay.
This commit is contained in:
Tom Lane
2004-05-31 03:48:10 +00:00
parent f024086db3
commit 9b178555fc
13 changed files with 779 additions and 250 deletions

View File

@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.15 2004/05/29 22:48:18 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/slru.c,v 1.16 2004/05/31 03:47:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,6 +17,7 @@
#include <unistd.h>
#include "access/slru.h"
#include "access/clog.h" /* only for NUM_CLOG_BUFFERS */
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
#include "storage/lwlock.h"
@@ -100,6 +101,8 @@ typedef enum
*/
typedef struct SlruSharedData
{
LWLockId ControlLock;
/*
* Info for each buffer slot. Page number is undefined when status is
* EMPTY. lru_count is essentially the number of page switches since
@@ -110,6 +113,7 @@ typedef struct SlruSharedData
SlruPageStatus page_status[NUM_CLOG_BUFFERS];
int page_number[NUM_CLOG_BUFFERS];
unsigned int page_lru_count[NUM_CLOG_BUFFERS];
LWLockId BufferLocks[NUM_CLOG_BUFFERS]; /* Per-buffer I/O locks */
/*
* latest_page_number is the page number of the current end of the
@@ -118,12 +122,24 @@ typedef struct SlruSharedData
*/
int latest_page_number;
} SlruSharedData;
typedef SlruSharedData *SlruShared;
#define SlruFileName(ctl, path, seg) \
snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
/*
* During SimpleLruFlush(), we will usually not need to write/fsync more
* than one or two physical files, but we may need to write several pages
* per file. We can consolidate the I/O requests by leaving files open
* until control returns to SimpleLruFlush(). This data structure remembers
* which files are open.
*/
typedef struct SlruFlushData
{
int num_files; /* # files actually open */
int fd[NUM_CLOG_BUFFERS]; /* their FD's */
int segno[NUM_CLOG_BUFFERS]; /* their clog seg#s */
} SlruFlushData;
/*
* Macro to mark a buffer slot "most recently used".
*/
@@ -145,14 +161,17 @@ typedef enum
SLRU_SEEK_FAILED,
SLRU_READ_FAILED,
SLRU_WRITE_FAILED,
SLRU_FSYNC_FAILED,
SLRU_CLOSE_FAILED
} SlruErrorCause;
static SlruErrorCause slru_errcause;
static int slru_errno;
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
SlruFlush fdata);
static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
static bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
@@ -165,24 +184,16 @@ static bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
int
SimpleLruShmemSize(void)
{
return MAXALIGN(sizeof(SlruSharedData))
+ BLCKSZ * NUM_CLOG_BUFFERS
+ MAXALIGN(sizeof(SlruLockData))
;
return MAXALIGN(sizeof(SlruSharedData)) + BLCKSZ * NUM_CLOG_BUFFERS;
}
void
SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
{
bool found;
char *ptr;
SlruShared shared;
SlruLock locks;
bool found;
ptr = ShmemInitStruct(name, SimpleLruShmemSize(), &found);
shared = (SlruShared) ptr;
locks = (SlruLock) (ptr + MAXALIGN(sizeof(SlruSharedData)) +
BLCKSZ * NUM_CLOG_BUFFERS);
shared = (SlruShared) ShmemInitStruct(name, SimpleLruShmemSize(), &found);
if (!IsUnderPostmaster)
{
@@ -192,18 +203,18 @@ SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
Assert(!found);
locks->ControlLock = LWLockAssign();
memset(shared, 0, sizeof(SlruSharedData));
shared->ControlLock = LWLockAssign();
bufptr = (char *) shared + MAXALIGN(sizeof(SlruSharedData));
for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
{
locks->BufferLocks[slotno] = LWLockAssign();
shared->page_buffer[slotno] = bufptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
shared->page_lru_count[slotno] = 1;
shared->BufferLocks[slotno] = LWLockAssign();
bufptr += BLCKSZ;
}
@@ -213,10 +224,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, const char *subdir)
Assert(found);
/* Initialize the unshared control struct */
ctl->locks = locks;
ctl->shared = shared;
ctl->ControlLock = shared->ControlLock;
/* Init directory path */
/* Initialize unshared copy of directory path */
snprintf(ctl->Dir, MAXPGPATH, "%s/%s", DataDir, subdir);
}
@@ -232,7 +243,7 @@ int
SimpleLruZeroPage(SlruCtl ctl, int pageno)
{
int slotno;
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
/* Find a suitable buffer slot for the page */
slotno = SlruSelectLRUPage(ctl, pageno);
@@ -270,7 +281,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
char *
SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
{
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
/* Outer loop handles restart if we lose the buffer to someone else */
for (;;)
@@ -313,8 +324,8 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
SlruRecentlyUsed(shared, slotno);
/* Release shared lock, grab per-buffer lock instead */
LWLockRelease(ctl->locks->ControlLock);
LWLockAcquire(ctl->locks->BufferLocks[slotno], LW_EXCLUSIVE);
LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->BufferLocks[slotno], LW_EXCLUSIVE);
/*
* Check to see if someone else already did the read, or took the
@@ -323,8 +334,8 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
if (shared->page_number[slotno] != pageno ||
shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
{
LWLockRelease(ctl->locks->BufferLocks[slotno]);
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
LWLockRelease(shared->BufferLocks[slotno]);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
continue;
}
@@ -332,14 +343,14 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
/* Re-acquire shared control lock and update page state */
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS);
shared->page_status[slotno] = ok ? SLRU_PAGE_CLEAN : SLRU_PAGE_EMPTY;
LWLockRelease(ctl->locks->BufferLocks[slotno]);
LWLockRelease(shared->BufferLocks[slotno]);
/* Now it's okay to ereport if we failed */
if (!ok)
@@ -364,11 +375,11 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, TransactionId xid, bool forwrite)
* Control lock must be held at entry, and will be held at exit.
*/
void
SimpleLruWritePage(SlruCtl ctl, int slotno)
SimpleLruWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
{
int pageno;
bool ok;
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
/* Do nothing if page does not need writing */
if (shared->page_status[slotno] != SLRU_PAGE_DIRTY &&
@@ -378,8 +389,8 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
pageno = shared->page_number[slotno];
/* Release shared lock, grab per-buffer lock instead */
LWLockRelease(ctl->locks->ControlLock);
LWLockAcquire(ctl->locks->BufferLocks[slotno], LW_EXCLUSIVE);
LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->BufferLocks[slotno], LW_EXCLUSIVE);
/*
* Check to see if someone else already did the write, or took the
@@ -392,8 +403,8 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
(shared->page_status[slotno] != SLRU_PAGE_DIRTY &&
shared->page_status[slotno] != SLRU_PAGE_WRITE_IN_PROGRESS))
{
LWLockRelease(ctl->locks->BufferLocks[slotno]);
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
LWLockRelease(shared->BufferLocks[slotno]);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
return;
}
@@ -412,10 +423,19 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
/* Okay, do the write */
ok = SlruPhysicalWritePage(ctl, pageno, slotno);
ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
/* If we failed, and we're in a flush, better close the files */
if (!ok && fdata)
{
int i;
for (i = 0; i < fdata->num_files; i++)
close(fdata->fd[i]);
}
/* Re-acquire shared control lock and update page state */
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS ||
@@ -425,7 +445,7 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
if (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
shared->page_status[slotno] = ok ? SLRU_PAGE_CLEAN : SLRU_PAGE_DIRTY;
LWLockRelease(ctl->locks->BufferLocks[slotno]);
LWLockRelease(shared->BufferLocks[slotno]);
/* Now it's okay to ereport if we failed */
if (!ok)
@@ -445,7 +465,7 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
static bool
SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
@@ -482,6 +502,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
close(fd);
return false;
}
@@ -490,6 +511,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_READ_FAILED;
slru_errno = errno;
close(fd);
return false;
}
@@ -511,50 +533,80 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
* info in static variables to let SlruReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across
* read/write operations. We could cache one virtual file pointer ...
* independent read/write operations. We do batch operations during
* SimpleLruFlush, though.
*
* fdata is NULL for a standalone write, pointer to open-file info during
* SimpleLruFlush.
*/
static bool
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
{
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd;
SlruFileName(ctl, path, segno);
int fd = -1;
/*
* If the file doesn't already exist, we should create it. It is
* possible for this to need to happen when writing a page that's not
* first in its segment; we assume the OS can cope with that. (Note:
* it might seem that it'd be okay to create files only when
* SimpleLruZeroPage is called for the first page of a segment.
* However, if after a crash and restart the REDO logic elects to
* replay the log from a checkpoint before the latest one, then it's
* possible that we will get commands to set transaction status of
* transactions that have already been truncated from the commit log.
* Easiest way to deal with that is to accept references to
* nonexistent files here and in SlruPhysicalReadPage.)
* During a Flush, we may already have the desired file open.
*/
fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fdata)
{
int i;
for (i = 0; i < fdata->num_files; i++)
{
if (fdata->segno[i] == segno)
{
fd = fdata->fd[i];
break;
}
}
}
if (fd < 0)
{
if (errno != ENOENT)
{
slru_errcause = SLRU_OPEN_FAILED;
slru_errno = errno;
return false;
}
fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
S_IRUSR | S_IWUSR);
/*
* If the file doesn't already exist, we should create it. It is
* possible for this to need to happen when writing a page that's not
* first in its segment; we assume the OS can cope with that.
* (Note: it might seem that it'd be okay to create files only when
* SimpleLruZeroPage is called for the first page of a segment.
* However, if after a crash and restart the REDO logic elects to
* replay the log from a checkpoint before the latest one, then it's
* possible that we will get commands to set transaction status of
* transactions that have already been truncated from the commit log.
* Easiest way to deal with that is to accept references to
* nonexistent files here and in SlruPhysicalReadPage.)
*/
SlruFileName(ctl, path, segno);
fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
slru_errcause = SLRU_CREATE_FAILED;
slru_errno = errno;
return false;
if (errno != ENOENT)
{
slru_errcause = SLRU_OPEN_FAILED;
slru_errno = errno;
return false;
}
fd = BasicOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
S_IRUSR | S_IWUSR);
if (fd < 0)
{
slru_errcause = SLRU_CREATE_FAILED;
slru_errno = errno;
return false;
}
}
if (fdata)
{
fdata->fd[fdata->num_files] = fd;
fdata->segno[fdata->num_files] = segno;
fdata->num_files++;
}
}
@@ -562,6 +614,8 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
if (!fdata)
close(fd);
return false;
}
@@ -573,14 +627,31 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno)
errno = ENOSPC;
slru_errcause = SLRU_WRITE_FAILED;
slru_errno = errno;
if (!fdata)
close(fd);
return false;
}
if (close(fd))
/*
* If not part of Flush, need to fsync now. We assume this happens
* infrequently enough that it's not a performance issue.
*/
if (!fdata)
{
slru_errcause = SLRU_CLOSE_FAILED;
slru_errno = errno;
return false;
if (pg_fsync(fd))
{
slru_errcause = SLRU_FSYNC_FAILED;
slru_errno = errno;
close(fd);
return false;
}
if (close(fd))
{
slru_errcause = SLRU_CLOSE_FAILED;
slru_errno = errno;
return false;
}
}
return true;
@@ -637,6 +708,13 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
errdetail("could not write to file \"%s\" at offset %u: %m",
path, offset)));
break;
case SLRU_FSYNC_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("could not fsync file \"%s\": %m",
path)));
break;
case SLRU_CLOSE_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
@@ -668,7 +746,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
static int
SlruSelectLRUPage(SlruCtl ctl, int pageno)
{
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
/* Outer loop handles restart after I/O */
for (;;)
@@ -717,7 +795,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
(void) SimpleLruReadPage(ctl, shared->page_number[bestslot],
InvalidTransactionId, false);
else
SimpleLruWritePage(ctl, bestslot);
SimpleLruWritePage(ctl, bestslot, NULL);
/*
* Now loop back and try again. This is the easiest way of
@@ -733,7 +811,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
void
SimpleLruSetLatestPage(SlruCtl ctl, int pageno)
{
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
shared->latest_page_number = pageno;
}
@@ -744,16 +822,20 @@ SimpleLruSetLatestPage(SlruCtl ctl, int pageno)
void
SimpleLruFlush(SlruCtl ctl, bool checkpoint)
{
#ifdef USE_ASSERT_CHECKING /* only used in Assert() */
SlruShared shared = (SlruShared) ctl->shared;
#endif
SlruShared shared = ctl->shared;
SlruFlushData fdata;
int slotno;
int pageno = 0;
int i;
bool ok;
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
fdata.num_files = 0;
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
for (slotno = 0; slotno < NUM_CLOG_BUFFERS; slotno++)
{
SimpleLruWritePage(ctl, slotno);
SimpleLruWritePage(ctl, slotno, &fdata);
/*
* When called during a checkpoint, we cannot assert that the slot
@@ -765,7 +847,32 @@ SimpleLruFlush(SlruCtl ctl, bool checkpoint)
shared->page_status[slotno] == SLRU_PAGE_CLEAN);
}
LWLockRelease(ctl->locks->ControlLock);
LWLockRelease(shared->ControlLock);
/*
* Now fsync and close any files that were open
*/
ok = true;
for (i = 0; i < fdata.num_files; i++)
{
if (pg_fsync(fdata.fd[i]))
{
slru_errcause = SLRU_FSYNC_FAILED;
slru_errno = errno;
pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
ok = false;
}
if (close(fdata.fd[i]))
{
slru_errcause = SLRU_CLOSE_FAILED;
slru_errno = errno;
pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
ok = false;
}
}
if (!ok)
SlruReportIOError(ctl, pageno, InvalidTransactionId);
}
/*
@@ -786,7 +893,7 @@ void
SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
{
int slotno;
SlruShared shared = (SlruShared) ctl->shared;
SlruShared shared = ctl->shared;
/*
* The cutoff point is the start of the segment containing cutoffPage.
@@ -805,7 +912,7 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
* have been flushed already during the checkpoint, we're just being
* extra careful here.)
*/
LWLockAcquire(ctl->locks->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
restart:;
@@ -817,7 +924,7 @@ restart:;
*/
if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
{
LWLockRelease(ctl->locks->ControlLock);
LWLockRelease(shared->ControlLock);
ereport(LOG,
(errmsg("could not truncate directory \"%s\": apparent wraparound",
ctl->Dir)));
@@ -849,11 +956,11 @@ restart:;
(void) SimpleLruReadPage(ctl, shared->page_number[slotno],
InvalidTransactionId, false);
else
SimpleLruWritePage(ctl, slotno);
SimpleLruWritePage(ctl, slotno, NULL);
goto restart;
}
LWLockRelease(ctl->locks->ControlLock);
LWLockRelease(shared->ControlLock);
/* Now we can remove the old segment(s) */
(void) SlruScanDirectory(ctl, cutoffPage, true);
@@ -878,7 +985,8 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
if (cldir == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open directory \"%s\": %m", ctl->Dir)));
errmsg("could not open directory \"%s\": %m",
ctl->Dir)));
errno = 0;
while ((clde = readdir(cldir)) != NULL)