1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Assert that buffers are marked dirty before XLogRegisterBuffer().

Enforce the rule from transam/README in XLogRegisterBuffer(), and
update callers to follow the rule.

Hash indexes sometimes register clean pages as a part of the locking
protocol, so provide a REGBUF_NO_CHANGE flag to support that use.

Discussion: https://postgr.es/m/c84114f8-c7f1-5b57-f85a-3adc31e1a904@iki.fi
Reviewed-by: Heikki Linnakangas
This commit is contained in:
Jeff Davis
2023-10-23 17:17:46 -07:00
parent befe9451fb
commit 00d7fb5e2e
10 changed files with 118 additions and 20 deletions

View File

@ -387,24 +387,22 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION(); START_CRIT_SECTION();
if (RelationNeedsWAL(btree->index) && !btree->isBuild) if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
if (BufferIsValid(childbuf))
XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
}
/* Perform the page update, and register any extra WAL data */ /*
* Perform the page update, dirty and register stack->buffer, and
* register any extra WAL data.
*/
btree->execPlaceToPage(btree, stack->buffer, stack, btree->execPlaceToPage(btree, stack->buffer, stack,
insertdata, updateblkno, ptp_workspace); insertdata, updateblkno, ptp_workspace);
MarkBufferDirty(stack->buffer);
/* An insert to an internal page finishes the split of the child. */ /* An insert to an internal page finishes the split of the child. */
if (BufferIsValid(childbuf)) if (BufferIsValid(childbuf))
{ {
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT; GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
MarkBufferDirty(childbuf); MarkBufferDirty(childbuf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
} }
if (RelationNeedsWAL(btree->index) && !btree->isBuild) if (RelationNeedsWAL(btree->index) && !btree->isBuild)

View File

@ -721,9 +721,12 @@ dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
/* Apply changes to page */ /* Apply changes to page */
dataPlaceToPageLeafRecompress(buf, leaf); dataPlaceToPageLeafRecompress(buf, leaf);
MarkBufferDirty(buf);
/* If needed, register WAL data built by computeLeafRecompressWALData */ /* If needed, register WAL data built by computeLeafRecompressWALData */
if (RelationNeedsWAL(btree->index) && !btree->isBuild) if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{ {
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen); XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
} }
} }
@ -1155,6 +1158,8 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
pitem = (PostingItem *) insertdata; pitem = (PostingItem *) insertdata;
GinDataPageAddPostingItem(page, pitem, off); GinDataPageAddPostingItem(page, pitem, off);
MarkBufferDirty(buf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild) if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{ {
/* /*
@ -1167,6 +1172,7 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
data.offset = off; data.offset = off;
data.newitem = *pitem; data.newitem = *pitem;
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) &data, XLogRegisterBufData(0, (char *) &data,
sizeof(ginxlogInsertDataInternal)); sizeof(ginxlogInsertDataInternal));
} }

View File

@ -571,6 +571,8 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
elog(ERROR, "failed to add item to index page in \"%s\"", elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index)); RelationGetRelationName(btree->index));
MarkBufferDirty(buf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild) if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{ {
/* /*
@ -583,6 +585,7 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
data.isDelete = insertData->isDelete; data.isDelete = insertData->isDelete;
data.offset = off; data.offset = off;
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) &data, XLogRegisterBufData(0, (char *) &data,
offsetof(ginxlogInsertEntry, tuple)); offsetof(ginxlogInsertEntry, tuple));
XLogRegisterBufData(0, (char *) insertData->entry, XLogRegisterBufData(0, (char *) insertData->entry,

View File

@ -397,6 +397,9 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
} }
Assert((ptr - collectordata) <= collector->sumsize); Assert((ptr - collectordata) <= collector->sumsize);
MarkBufferDirty(buffer);
if (needWal) if (needWal)
{ {
XLogRegisterBuffer(1, buffer, REGBUF_STANDARD); XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
@ -404,8 +407,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
} }
metadata->tailFreeSize = PageGetExactFreeSpace(page); metadata->tailFreeSize = PageGetExactFreeSpace(page);
MarkBufferDirty(buffer);
} }
/* /*

View File

@ -824,11 +824,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
XLogRegisterData((char *) &xlrec, SizeOfHashDelete); XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
/* /*
* bucket buffer needs to be registered to ensure that we can * bucket buffer was not changed, but still needs to be
* acquire a cleanup lock on it during replay. * registered to ensure that we can acquire a cleanup lock on
* it during replay.
*/ */
if (!xlrec.is_primary_bucket_page) if (!xlrec.is_primary_bucket_page)
XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE); {
uint8 flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucket_buf, flags);
}
XLogRegisterBuffer(1, buf, REGBUF_STANDARD); XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
XLogRegisterBufData(1, (char *) deletable, XLogRegisterBufData(1, (char *) deletable,

View File

@ -658,11 +658,15 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage); XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
/* /*
* bucket buffer needs to be registered to ensure that we can acquire * bucket buffer was not changed, but still needs to be registered to
* a cleanup lock on it during replay. * ensure that we can acquire a cleanup lock on it during replay.
*/ */
if (!xlrec.is_prim_bucket_same_wrt) if (!xlrec.is_prim_bucket_same_wrt)
XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD | REGBUF_NO_IMAGE); {
uint8 flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucketbuf, flags);
}
XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD); XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
if (xlrec.ntups > 0) if (xlrec.ntups > 0)
@ -960,11 +964,16 @@ readpage:
XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents); XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
/* /*
* bucket buffer needs to be registered to ensure that * bucket buffer was not changed, but still needs to
* we can acquire a cleanup lock on it during replay. * be registered to ensure that we can acquire a
* cleanup lock on it during replay.
*/ */
if (!xlrec.is_prim_bucket_same_wrt) if (!xlrec.is_prim_bucket_same_wrt)
XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE); {
int flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucket_buf, flags);
}
XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD); XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
XLogRegisterBufData(1, (char *) itup_offsets, XLogRegisterBufData(1, (char *) itup_offsets,

View File

@ -248,6 +248,20 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
Assert(begininsert_called); Assert(begininsert_called);
/*
* Ordinarily, buffer should be exclusive-locked and marked dirty before
* we get here, otherwise we could end up violating one of the rules in
* access/transam/README.
*
* Some callers intentionally register a clean page and never update that
* page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
* bypass these checks.
*/
#ifdef USE_ASSERT_CHECKING
if (!(flags & REGBUF_NO_CHANGE))
Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
#endif
if (block_id >= max_registered_block_id) if (block_id >= max_registered_block_id)
{ {
if (block_id >= max_registered_buffers) if (block_id >= max_registered_buffers)
@ -1313,8 +1327,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
START_CRIT_SECTION(); START_CRIT_SECTION();
for (i = 0; i < nbufs; i++) for (i = 0; i < nbufs; i++)
{ {
XLogRegisterBuffer(i, bufpack[i], flags);
MarkBufferDirty(bufpack[i]); MarkBufferDirty(bufpack[i]);
XLogRegisterBuffer(i, bufpack[i], flags);
} }
recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

View File

@ -2098,6 +2098,65 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
return first_block; return first_block;
} }
/*
* BufferIsExclusiveLocked
*
* Checks if buffer is exclusive-locked.
*
* Buffer must be pinned.
*/
bool
BufferIsExclusiveLocked(Buffer buffer)
{
BufferDesc *bufHdr;
if (BufferIsLocal(buffer))
{
int bufid = -buffer - 1;
bufHdr = GetLocalBufferDescriptor(bufid);
}
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
}
Assert(BufferIsPinned(buffer));
return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE);
}
/*
* BufferIsDirty
*
* Checks if buffer is already dirty.
*
* Buffer must be pinned and exclusive-locked. (Without an exclusive lock,
* the result may be stale before it's returned.)
*/
bool
BufferIsDirty(Buffer buffer)
{
BufferDesc *bufHdr;
if (BufferIsLocal(buffer))
{
int bufid = -buffer - 1;
bufHdr = GetLocalBufferDescriptor(bufid);
}
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
}
Assert(BufferIsPinned(buffer));
Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE));
return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY;
}
/* /*
* MarkBufferDirty * MarkBufferDirty
* *

View File

@ -37,6 +37,7 @@
* will be skipped) */ * will be skipped) */
#define REGBUF_KEEP_DATA 0x10 /* include data even if a full-page image #define REGBUF_KEEP_DATA 0x10 /* include data even if a full-page image
* is taken */ * is taken */
#define REGBUF_NO_CHANGE 0x20 /* intentionally register clean buffer */
/* prototypes for public functions in xloginsert.c: */ /* prototypes for public functions in xloginsert.c: */
extern void XLogBeginInsert(void); extern void XLogBeginInsert(void);

View File

@ -179,6 +179,8 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
bool permanent); bool permanent);
extern void ReleaseBuffer(Buffer buffer); extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer);
extern bool BufferIsExclusiveLocked(Buffer buffer);
extern bool BufferIsDirty(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer); extern void MarkBufferDirty(Buffer buffer);
extern void IncrBufferRefCount(Buffer buffer); extern void IncrBufferRefCount(Buffer buffer);
extern void CheckBufferIsPinnedOnce(Buffer buffer); extern void CheckBufferIsPinnedOnce(Buffer buffer);