1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

Implement new 'lightweight lock manager' that's intermediate between

existing lock manager and spinlocks: it understands exclusive vs shared
lock but has few other fancy features.  Replace most uses of spinlocks
with lightweight locks.  All remaining uses of spinlocks have very short
lock hold times (a few dozen instructions), so tweak spinlock backoff
code to work efficiently given this assumption.  All per my proposal on
pghackers 26-Sep-01.
This commit is contained in:
Tom Lane
2001-09-29 04:02:27 +00:00
parent 818fb55ac4
commit 499abb0c0f
46 changed files with 1595 additions and 1355 deletions

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.77 2001/09/26 20:24:02 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.78 2001/09/29 04:02:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -33,11 +33,11 @@
#include "access/xlogutils.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "storage/sinval.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "storage/bufpage.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/sinval.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/relcache.h"
#include "utils/selfuncs.h"
@ -86,11 +86,6 @@
#endif
/* Max time to wait to acquire XLog activity locks */
#define XLOG_LOCK_TIMEOUT (5*60*1000000) /* 5 minutes */
/* Max time to wait to acquire checkpoint lock */
#define CHECKPOINT_LOCK_TIMEOUT (20*60*1000000) /* 20 minutes */
/* User-settable parameters */
int CheckPointSegments = 3;
int XLOGbuffers = 8;
@ -155,13 +150,10 @@ static XLogRecPtr ProcLastRecPtr = {0, 0};
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
* hold the Insert spinlock). See XLogInsert for details.
* hold the Insert lock). See XLogInsert for details.
*/
static XLogRecPtr RedoRecPtr;
/* This lock must be held to read/update control file or create new log file */
SPINLOCK ControlFileLockId;
/*----------
* Shared-memory data structures for XLOG control
*
@ -171,24 +163,24 @@ SPINLOCK ControlFileLockId;
* These structs are identical but are declared separately to indicate their
* slightly different functions.
*
* We do a lot of pushups to minimize the amount of access to spinlocked
* We do a lot of pushups to minimize the amount of access to lockable
* shared memory values. There are actually three shared-memory copies of
* LogwrtResult, plus one unshared copy in each backend. Here's how it works:
* XLogCtl->LogwrtResult is protected by info_lck
* XLogCtl->Write.LogwrtResult is protected by logwrt_lck
* XLogCtl->Insert.LogwrtResult is protected by insert_lck
* One must hold the associated spinlock to read or write any of these, but
* of course no spinlock is needed to read/write the unshared LogwrtResult.
* XLogCtl->Write.LogwrtResult is protected by WALWriteLock
* XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
* One must hold the associated lock to read or write any of these, but
* of course no lock is needed to read/write the unshared LogwrtResult.
*
* XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
* right", since both are updated by a write or flush operation before
* it releases logwrt_lck. The point of keeping XLogCtl->Write.LogwrtResult
* is that it can be examined/modified by code that already holds logwrt_lck
* it releases WALWriteLock. The point of keeping XLogCtl->Write.LogwrtResult
* is that it can be examined/modified by code that already holds WALWriteLock
* without needing to grab info_lck as well.
*
* XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
* but is updated when convenient. Again, it exists for the convenience of
* code that is already holding insert_lck but not the other locks.
* code that is already holding WALInsertLock but not the other locks.
*
* The unshared LogwrtResult may lag behind any or all of these, and again
* is updated when convenient.
@ -199,6 +191,24 @@ SPINLOCK ControlFileLockId;
* Note that this all works because the request and result positions can only
* advance forward, never back up, and so we can easily determine which of two
* values is "more up to date".
*
* info_lck is only held long enough to read/update the protected variables,
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
* WALInsertLock: must be held to insert a record into the WAL buffers.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
*
* ControlFileLock: must be held to read/update control file or create
* new log file.
*
* CheckpointLock: must be held to do a checkpoint (ensures only one
* checkpointer at a time; even though the postmaster won't launch
* parallel checkpoint processes, we need this because manual checkpoints
* could be launched simultaneously).
*
*----------
*/
typedef struct XLogwrtRqst
@ -240,18 +250,18 @@ typedef struct XLogCtlWrite
*/
typedef struct XLogCtlData
{
/* Protected by insert_lck: */
/* Protected by WALInsertLock: */
XLogCtlInsert Insert;
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
XLogwrtResult LogwrtResult;
/* Protected by logwrt_lck: */
/* Protected by WALWriteLock: */
XLogCtlWrite Write;
/*
* These values do not change after startup, although the pointed-to
* pages and xlblocks values certainly do. Permission to read/write
* the pages and xlblocks values depends on insert_lck and logwrt_lck.
* pages and xlblocks values certainly do. Permission to read/write the
* pages and xlblocks values depends on WALInsertLock and WALWriteLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
@ -259,13 +269,10 @@ typedef struct XLogCtlData
uint32 XLogCacheBlck; /* highest allocated xlog buffer index */
StartUpID ThisStartUpID;
/* This value is not protected by *any* spinlock... */
/* This value is not protected by *any* lock... */
XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */
slock_t insert_lck; /* XLogInsert lock */
slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
slock_t logwrt_lck; /* XLogWrite/XLogFlush lock */
slock_t chkp_lck; /* checkpoint lock */
} XLogCtlData;
static XLogCtlData *XLogCtl = NULL;
@ -473,7 +480,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
uint32 len,
write_len;
unsigned i;
bool do_logwrt;
XLogwrtRqst LogwrtRqst;
bool updrqst;
bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
@ -505,7 +512,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
*
* We may have to loop back to here if a race condition is detected
* below. We could prevent the race by doing all this work while
* holding the insert spinlock, but it seems better to avoid doing CRC
* holding the insert lock, but it seems better to avoid doing CRC
* calculations while holding the lock. This means we have to be
* careful about modifying the rdata list until we know we aren't
* going to loop back again. The only change we allow ourselves to
@ -607,48 +614,33 @@ begin:;
START_CRIT_SECTION();
/* wait to obtain xlog insert lock */
do_logwrt = true;
/* update LogwrtResult before doing cache fill check */
SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
LogwrtRqst = XLogCtl->LogwrtRqst;
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
for (i = 0;;)
/*
* If cache is half filled then try to acquire write lock and
* do XLogWrite. Ignore any fractional blocks in performing this check.
*/
LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
(LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
XLogCtl->XLogCacheByte / 2))
{
/* try to update LogwrtResult while waiting for insert lock */
if (!TAS(&(XLogCtl->info_lck)))
if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
{
XLogwrtRqst LogwrtRqst;
LogwrtRqst = XLogCtl->LogwrtRqst;
LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
/*
* If cache is half filled then try to acquire logwrt lock and
* do LOGWRT work, but only once per XLogInsert call. Ignore
* any fractional blocks in performing this check.
*/
LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
if (do_logwrt &&
(LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
(LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
XLogCtl->XLogCacheByte / 2)))
{
if (!TAS(&(XLogCtl->logwrt_lck)))
{
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
{
XLogWrite(LogwrtRqst);
do_logwrt = false;
}
S_UNLOCK(&(XLogCtl->logwrt_lck));
}
}
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
XLogWrite(LogwrtRqst);
LWLockRelease(WALWriteLock);
}
if (!TAS(&(XLogCtl->insert_lck)))
break;
S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
}
/* Now wait to get insert lock */
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to
* go back and recompute everything. This can only happen just after
@ -667,12 +659,11 @@ begin:;
if (dtbuf_bkp[i] == false &&
XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{
/*
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
S_UNLOCK(&(XLogCtl->insert_lck));
LWLockRelease(WALInsertLock);
END_CRIT_SECTION();
goto begin;
}
@ -751,9 +742,9 @@ begin:;
/* If first XLOG record of transaction, save it in PROC array */
if (MyLastRecPtr.xrecoff == 0 && !no_tran)
{
SpinAcquire(SInvalLock);
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
MyProc->logRec = RecPtr;
SpinRelease(SInvalLock);
LWLockRelease(SInvalLock);
}
if (XLOG_DEBUG)
@ -837,17 +828,17 @@ begin:;
curridx = PrevBufIdx(curridx);
WriteRqst = XLogCtl->xlblocks[curridx];
S_UNLOCK(&(XLogCtl->insert_lck));
LWLockRelease(WALInsertLock);
if (updrqst)
{
S_LOCK(&(XLogCtl->info_lck));
SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst))
XLogCtl->LogwrtRqst.Write = WriteRqst;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
}
END_CRIT_SECTION();
@ -859,11 +850,11 @@ begin:;
* buffer if it still contains unwritten data.
*
* The global LogwrtRqst.Write pointer needs to be advanced to include the
* just-filled page. If we can do this for free (without an extra spinlock),
* just-filled page. If we can do this for free (without an extra lock),
* we do so here. Otherwise the caller must do it. We return TRUE if the
* request update still needs to be done, FALSE if we did it internally.
*
* Must be called with insert_lck held.
* Must be called with WALInsertLock held.
*/
static bool
AdvanceXLInsertBuffer(void)
@ -890,45 +881,37 @@ AdvanceXLInsertBuffer(void)
if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
/* nope, got work to do... */
unsigned spins = 0;
XLogRecPtr FinishedPageRqstPtr;
FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
for (;;)
/* Before waiting, get info_lck and update LogwrtResult */
SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
update_needed = false; /* Did the shared-request update */
if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
/* While waiting, try to get info_lck and update LogwrtResult */
if (!TAS(&(XLogCtl->info_lck)))
/* OK, someone wrote it already */
Insert->LogwrtResult = LogwrtResult;
}
else
{
/* Must acquire write lock */
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = Write->LogwrtResult;
if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
update_needed = false; /* Did the shared-request update */
LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
/* OK, someone wrote it already */
Insert->LogwrtResult = LogwrtResult;
break;
}
/* OK, someone wrote it already */
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
}
/*
* LogwrtResult lock is busy or we know the page is still
* dirty. Try to acquire logwrt lock and write full blocks.
*/
if (!TAS(&(XLogCtl->logwrt_lck)))
else
{
LogwrtResult = Write->LogwrtResult;
if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
S_UNLOCK(&(XLogCtl->logwrt_lck));
/* OK, someone wrote it already */
Insert->LogwrtResult = LogwrtResult;
break;
}
/*
* Have to write buffers while holding insert lock. This
* is not good, so only write as much as we absolutely
@ -938,11 +921,9 @@ AdvanceXLInsertBuffer(void)
WriteRqst.Flush.xlogid = 0;
WriteRqst.Flush.xrecoff = 0;
XLogWrite(WriteRqst);
S_UNLOCK(&(XLogCtl->logwrt_lck));
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
break;
}
S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
}
}
@ -986,7 +967,7 @@ AdvanceXLInsertBuffer(void)
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
* Must be called with logwrt_lck held.
* Must be called with WALWriteLock held.
*/
static void
XLogWrite(XLogwrtRqst WriteRqst)
@ -1047,7 +1028,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
"consider increasing WAL_FILES");
/* update pg_control, unless someone else already did */
SpinAcquire(ControlFileLockId);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (ControlFile->logId < openLogId ||
(ControlFile->logId == openLogId &&
ControlFile->logSeg < openLogSeg + 1))
@ -1073,7 +1054,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
kill(getppid(), SIGUSR1);
}
}
SpinRelease(ControlFileLockId);
LWLockRelease(ControlFileLock);
}
if (openLogFile < 0)
@ -1167,13 +1148,13 @@ XLogWrite(XLogwrtRqst WriteRqst)
* 'result' values. This is not absolutely essential, but it saves
* some code in a couple of places.
*/
S_LOCK(&(XLogCtl->info_lck));
SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
XLogCtl->LogwrtResult = LogwrtResult;
if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write))
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush))
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
S_UNLOCK(&(XLogCtl->info_lck));
SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
Write->LogwrtResult = LogwrtResult;
}
@ -1181,7 +1162,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
/*
* Ensure that all XLOG data through the given position is flushed to disk.
*
* NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not
* NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
* already held, and we try to avoid acquiring it if possible.
*/
void
@ -1189,7 +1170,6 @@ XLogFlush(XLogRecPtr record)
{
XLogRecPtr WriteRqstPtr;
XLogwrtRqst WriteRqst;
unsigned spins = 0;
if (XLOG_DEBUG)
{
@ -1224,23 +1204,18 @@ XLogFlush(XLogRecPtr record)
/* initialize to given target; may increase below */
WriteRqstPtr = record;
for (;;)
/* read LogwrtResult and update local state */
SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
/* done already? */
if (!XLByteLE(record, LogwrtResult.Flush))
{
/* try to read LogwrtResult and update local state */
if (!TAS(&(XLogCtl->info_lck)))
{
if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
if (XLByteLE(record, LogwrtResult.Flush))
{
/* Done already */
break;
}
}
/* if something was added to log cache then try to flush this too */
if (!TAS(&(XLogCtl->insert_lck)))
if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace = INSERT_FREESPACE(Insert);
@ -1252,29 +1227,22 @@ XLogFlush(XLogRecPtr record)
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
WriteRqstPtr.xrecoff -= freespace;
}
S_UNLOCK(&(XLogCtl->insert_lck));
LWLockRelease(WALInsertLock);
}
/* now try to get the logwrt lock */
if (!TAS(&(XLogCtl->logwrt_lck)))
/* now wait for the write lock */
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (!XLByteLE(record, LogwrtResult.Flush))
{
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLE(record, LogwrtResult.Flush))
{
/* Done already */
S_UNLOCK(&(XLogCtl->logwrt_lck));
break;
}
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
XLogWrite(WriteRqst);
S_UNLOCK(&(XLogCtl->logwrt_lck));
if (XLByteLT(LogwrtResult.Flush, record))
elog(STOP, "XLogFlush: request %X/%X is not satisfied --- flushed only to %X/%X",
record.xlogid, record.xrecoff,
LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
break;
}
S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
LWLockRelease(WALWriteLock);
}
END_CRIT_SECTION();
@ -1289,9 +1257,9 @@ XLogFlush(XLogRecPtr record)
* pre-existing file will be deleted). On return, TRUE if a pre-existing
* file was used.
*
* use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
* use_lock: if TRUE, acquire ControlFileLock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the spinlock at call.
* caller must *not* hold the lock at call.
*
* Returns FD of opened file.
*/
@ -1329,7 +1297,7 @@ XLogFileInit(uint32 log, uint32 seg,
* Initialize an empty (all zeroes) segment. NOTE: it is possible
* that another process is doing the same thing. If so, we will end
* up pre-creating an extra log segment. That seems OK, and better
* than holding the spinlock throughout this lengthy process.
* than holding the lock throughout this lengthy process.
*/
snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
XLogDir, (int) getpid());
@ -1423,9 +1391,9 @@ XLogFileInit(uint32 log, uint32 seg,
* point. Fail if no free slot is found in this range. (Irrelevant if
* find_free is FALSE.)
*
* use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
* use_lock: if TRUE, acquire ControlFileLock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the spinlock at call.
* caller must *not* hold the lock at call.
*
* Returns TRUE if file installed, FALSE if not installed because of
* exceeding max_advance limit. (Any other kind of failure causes elog().)
@ -1444,7 +1412,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
* We want to be sure that only one process does this at a time.
*/
if (use_lock)
SpinAcquire(ControlFileLockId);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (!find_free)
{
@ -1462,7 +1430,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
{
/* Failed to find a free slot within specified range */
if (use_lock)
SpinRelease(ControlFileLockId);
LWLockRelease(ControlFileLock);
return false;
}
NextLogSeg(log, seg);
@ -1487,7 +1455,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
#endif
if (use_lock)
SpinRelease(ControlFileLockId);
LWLockRelease(ControlFileLock);
return true;
}
@ -2319,10 +2287,7 @@ XLOGShmemInit(void)
XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
S_INIT_LOCK(&(XLogCtl->insert_lck));
S_INIT_LOCK(&(XLogCtl->info_lck));
S_INIT_LOCK(&(XLogCtl->logwrt_lck));
S_INIT_LOCK(&(XLogCtl->chkp_lck));
SpinLockInit(&XLogCtl->info_lck);
/*
* If we are not in bootstrap mode, pg_control should already exist.
@ -2821,12 +2786,12 @@ SetThisStartUpID(void)
* in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster
* calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
* subsequently-spawned backends will start out with a reasonably up-to-date
* local RedoRecPtr. Since these operations are not protected by any spinlock
* local RedoRecPtr. Since these operations are not protected by any lock
* and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
* routines at other times!
*
* Note: once spawned, a backend must update its local RedoRecPtr from
* XLogCtl->Insert.RedoRecPtr while holding the insert spinlock. This is
* XLogCtl->Insert.RedoRecPtr while holding the insert lock. This is
* done in XLogInsert().
*/
void
@ -2874,20 +2839,26 @@ CreateCheckPoint(bool shutdown)
uint32 freespace;
uint32 _logId;
uint32 _logSeg;
unsigned spins = 0;
if (MyLastRecPtr.xrecoff != 0)
elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
START_CRIT_SECTION();
/* Grab lock, using larger than normal sleep between tries (1 sec) */
while (TAS(&(XLogCtl->chkp_lck)))
/*
* The CheckpointLock can be held for quite a while, which is not good
* because we won't respond to a cancel/die request while waiting for an
* LWLock. (But the alternative of using a regular lock won't work for
* background checkpoint processes, which are not regular backends.)
* So, rather than use a plain LWLockAcquire, use this kluge to allow
* an interrupt to be accepted while we are waiting:
*/
while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
{
S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++,
CHECKPOINT_LOCK_TIMEOUT, 1000000);
CHECK_FOR_INTERRUPTS();
sleep(1);
}
START_CRIT_SECTION();
if (shutdown)
{
ControlFile->state = DB_SHUTDOWNING;
@ -2899,7 +2870,7 @@ CreateCheckPoint(bool shutdown)
checkPoint.ThisStartUpID = ThisStartUpID;
checkPoint.time = time(NULL);
S_LOCK(&(XLogCtl->insert_lck));
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
/*
* If this isn't a shutdown, and we have not inserted any XLOG records
@ -2929,8 +2900,8 @@ CreateCheckPoint(bool shutdown)
ControlFile->checkPoint.xrecoff ==
ControlFile->checkPointCopy.redo.xrecoff)
{
S_UNLOCK(&(XLogCtl->insert_lck));
S_UNLOCK(&(XLogCtl->chkp_lck));
LWLockRelease(WALInsertLock);
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
}
@ -2974,17 +2945,17 @@ CreateCheckPoint(bool shutdown)
* Now we can release insert lock, allowing other xacts to proceed
* even while we are flushing disk buffers.
*/
S_UNLOCK(&(XLogCtl->insert_lck));
LWLockRelease(WALInsertLock);
SpinAcquire(XidGenLockId);
LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid;
SpinRelease(XidGenLockId);
LWLockRelease(XidGenLock);
SpinAcquire(OidGenLockId);
LWLockAcquire(OidGenLock, LW_SHARED);
checkPoint.nextOid = ShmemVariableCache->nextOid;
if (!shutdown)
checkPoint.nextOid += ShmemVariableCache->oidCount;
SpinRelease(OidGenLockId);
LWLockRelease(OidGenLock);
/*
* Having constructed the checkpoint record, ensure all shmem disk
@ -3039,7 +3010,7 @@ CreateCheckPoint(bool shutdown)
/*
* Update the control file.
*/
SpinAcquire(ControlFileLockId);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (shutdown)
ControlFile->state = DB_SHUTDOWNED;
ControlFile->prevCheckPoint = ControlFile->checkPoint;
@ -3047,7 +3018,7 @@ CreateCheckPoint(bool shutdown)
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = time(NULL);
UpdateControlFile();
SpinRelease(ControlFileLockId);
LWLockRelease(ControlFileLock);
/*
* Delete offline log files (those no longer needed even for previous
@ -3067,7 +3038,7 @@ CreateCheckPoint(bool shutdown)
if (!shutdown)
PreallocXlogFiles(recptr);
S_UNLOCK(&(XLogCtl->chkp_lck));
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
}