1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-24 00:23:06 +03:00
Files
postgres/src/backend/access/heap/heapam_xlog.c
Peter Eisentraut 827b4060a8 Remove unnecessary (char *) casts [mem]
Remove (char *) casts around memory functions such as memcmp(),
memcpy(), or memset() where the cast is useless.  Since these
functions don't take char * arguments anyway, these casts are at best
complicated casts to (void *), about which see commit 7f798aca1d.

Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
Discussion: https://www.postgresql.org/message-id/flat/fd1fcedb-3492-4fc8-9e3e-74b97f2db6c7%40eisentraut.org
2025-02-12 08:50:13 +01:00

1346 lines
37 KiB
C

/*-------------------------------------------------------------------------
*
* heapam_xlog.c
* WAL replay logic for heap access method.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/access/heap/heapam_xlog.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/bufmask.h"
#include "access/heapam.h"
#include "access/visibilitymap.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/freespace.h"
#include "storage/standby.h"
/*
* Replay XLOG_HEAP2_PRUNE_* records.
*/
static void
heap_xlog_prune_freeze(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
char *maindataptr = XLogRecGetData(record);
xl_heap_prune xlrec;
Buffer buffer;
RelFileLocator rlocator;
BlockNumber blkno;
XLogRedoAction action;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
maindataptr += SizeOfHeapPrune;
/*
* We will take an ordinary exclusive lock or a cleanup lock depending on
* whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive
* lock, we better not be doing anything that requires moving existing
* tuple data.
*/
Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
(xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
/*
* We are about to remove and/or freeze tuples. In Hot Standby mode,
* ensure that there are no queries running for which the removed tuples
* are still visible or which still consider the frozen xids as running.
* The conflict horizon XID comes after xl_heap_prune.
*/
if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
{
TransactionId snapshot_conflict_horizon;
/* memcpy() because snapshot_conflict_horizon is stored unaligned */
memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
maindataptr += sizeof(TransactionId);
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
(xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
rlocator);
}
/*
* If we have a full-page image, restore it and we're done.
*/
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
(xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
&buffer);
if (action == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
OffsetNumber *redirected;
OffsetNumber *nowdead;
OffsetNumber *nowunused;
int nredirected;
int ndead;
int nunused;
int nplans;
Size datalen;
xlhp_freeze_plan *plans;
OffsetNumber *frz_offsets;
char *dataptr = XLogRecGetBlockData(record, 0, &datalen);
heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
&nplans, &plans, &frz_offsets,
&nredirected, &redirected,
&ndead, &nowdead,
&nunused, &nowunused);
/*
* Update all line pointers per the record, and repair fragmentation
* if needed.
*/
if (nredirected > 0 || ndead > 0 || nunused > 0)
heap_page_prune_execute(buffer,
(xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
redirected, nredirected,
nowdead, ndead,
nowunused, nunused);
/* Freeze tuples */
for (int p = 0; p < nplans; p++)
{
HeapTupleFreeze frz;
/*
* Convert freeze plan representation from WAL record into
* per-tuple format used by heap_execute_freeze_tuple
*/
frz.xmax = plans[p].xmax;
frz.t_infomask2 = plans[p].t_infomask2;
frz.t_infomask = plans[p].t_infomask;
frz.frzflags = plans[p].frzflags;
frz.offset = InvalidOffsetNumber; /* unused, but be tidy */
for (int i = 0; i < plans[p].ntuples; i++)
{
OffsetNumber offset = *(frz_offsets++);
ItemId lp;
HeapTupleHeader tuple;
lp = PageGetItemId(page, offset);
tuple = (HeapTupleHeader) PageGetItem(page, lp);
heap_execute_freeze_tuple(tuple, &frz);
}
}
/* There should be no more data */
Assert((char *) frz_offsets == dataptr + datalen);
/*
* Note: we don't worry about updating the page's prunability hints.
* At worst this will cause an extra prune cycle to occur soon.
*/
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
/*
* If we released any space or line pointers, update the free space map.
*
* Do this regardless of a full-page image being applied, since the FSM
* data is not in the page anyway.
*/
if (BufferIsValid(buffer))
{
if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
XLHP_HAS_DEAD_ITEMS |
XLHP_HAS_NOW_UNUSED_ITEMS))
{
Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
UnlockReleaseBuffer(buffer);
XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
}
else
UnlockReleaseBuffer(buffer);
}
}
/*
* Replay XLOG_HEAP2_VISIBLE records.
*
* The critical integrity requirement here is that we must never end up with
* a situation where the visibility map bit is set, and the page-level
* PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent
* page modification would fail to clear the visibility map bit.
*/
static void
heap_xlog_visible(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
Buffer vmbuffer = InvalidBuffer;
Buffer buffer;
Page page;
RelFileLocator rlocator;
BlockNumber blkno;
XLogRedoAction action;
Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
/*
* If there are any Hot Standby transactions running that have an xmin
* horizon old enough that this page isn't all-visible for them, they
* might incorrectly decide that an index-only scan can skip a heap fetch.
*
* NB: It might be better to throw some kind of "soft" conflict here that
* forces any index-only scan that is in flight to perform heap fetches,
* rather than killing the transaction outright.
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
rlocator);
/*
* Read the heap page, if it still exists. If the heap file has dropped or
* truncated later in recovery, we don't need to update the page, but we'd
* better still update the visibility map.
*/
action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
/*
* We don't bump the LSN of the heap page when setting the visibility
* map bit (unless checksums or wal_hint_bits is enabled, in which
* case we must). This exposes us to torn page hazards, but since
* we're not inspecting the existing page contents in any way, we
* don't care.
*/
page = BufferGetPage(buffer);
PageSetAllVisible(page);
if (XLogHintBitIsNeeded())
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
else if (action == BLK_RESTORED)
{
/*
* If heap block was backed up, we already restored it and there's
* nothing more to do. (This can only happen with checksums or
* wal_log_hints enabled.)
*/
}
if (BufferIsValid(buffer))
{
Size space = PageGetFreeSpace(BufferGetPage(buffer));
UnlockReleaseBuffer(buffer);
/*
* Since FSM is not WAL-logged and only updated heuristically, it
* easily becomes stale in standbys. If the standby is later promoted
* and runs VACUUM, it will skip updating individual free space
* figures for pages that became all-visible (or all-frozen, depending
* on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
* propagates too optimistic free space values to upper FSM layers;
* later inserters try to use such pages only to find out that they
* are unusable. This can cause long stalls when there are many such
* pages.
*
* Forestall those problems by updating FSM's idea about a page that
* is becoming all-visible or all-frozen.
*
* Do this regardless of a full-page image being applied, since the
* FSM data is not in the page anyway.
*/
if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
XLogRecordPageWithFreeSpace(rlocator, blkno, space);
}
/*
* Even if we skipped the heap page update due to the LSN interlock, it's
* still safe to update the visibility map. Any WAL record that clears
* the visibility map bit does so before checking the page LSN, so any
* bits that need to be cleared will still be cleared.
*/
if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
&vmbuffer) == BLK_NEEDS_REDO)
{
Page vmpage = BufferGetPage(vmbuffer);
Relation reln;
uint8 vmbits;
/* initialize the page if it was read as zeros */
if (PageIsNew(vmpage))
PageInit(vmpage, BLCKSZ, 0);
/* remove VISIBILITYMAP_XLOG_* */
vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
/*
* XLogReadBufferForRedoExtended locked the buffer. But
* visibilitymap_set will handle locking itself.
*/
LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
reln = CreateFakeRelcacheEntry(rlocator);
visibilitymap_pin(reln, blkno, &vmbuffer);
visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
xlrec->snapshotConflictHorizon, vmbits);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
else if (BufferIsValid(vmbuffer))
UnlockReleaseBuffer(vmbuffer);
}
/*
* Given an "infobits" field from an XLog record, set the correct bits in the
* given infomask and infomask2 for the tuple touched by the record.
*
* (This is the reverse of compute_infobits).
*/
static void
fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
{
*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
*infomask2 &= ~HEAP_KEYS_UPDATED;
if (infobits & XLHL_XMAX_IS_MULTI)
*infomask |= HEAP_XMAX_IS_MULTI;
if (infobits & XLHL_XMAX_LOCK_ONLY)
*infomask |= HEAP_XMAX_LOCK_ONLY;
if (infobits & XLHL_XMAX_EXCL_LOCK)
*infomask |= HEAP_XMAX_EXCL_LOCK;
/* note HEAP_XMAX_SHR_LOCK isn't considered here */
if (infobits & XLHL_XMAX_KEYSHR_LOCK)
*infomask |= HEAP_XMAX_KEYSHR_LOCK;
if (infobits & XLHL_KEYS_UPDATED)
*infomask2 |= HEAP_KEYS_UPDATED;
}
/*
* Replay XLOG_HEAP_DELETE records.
*/
static void
heap_xlog_delete(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
ItemId lp = NULL;
HeapTupleHeader htup;
BlockNumber blkno;
RelFileLocator target_locator;
ItemPointerData target_tid;
XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
ItemPointerSetBlockNumber(&target_tid, blkno);
ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(target_locator);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, blkno, &vmbuffer);
visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
lp = PageGetItemId(page, xlrec->offnum);
if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
HeapTupleHeaderClearHotUpdated(htup);
fix_infomask_from_infobits(xlrec->infobits_set,
&htup->t_infomask, &htup->t_infomask2);
if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
else
HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure t_ctid is set correctly */
if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
HeapTupleHeaderSetMovedPartitions(htup);
else
htup->t_ctid = target_tid;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Replay XLOG_HEAP_INSERT records.
*/
static void
heap_xlog_insert(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
union
{
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
HeapTupleHeader htup;
xl_heap_header xlhdr;
uint32 newlen;
Size freespace = 0;
RelFileLocator target_locator;
BlockNumber blkno;
ItemPointerData target_tid;
XLogRedoAction action;
XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
ItemPointerSetBlockNumber(&target_tid, blkno);
ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(target_locator);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, blkno, &vmbuffer);
visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
/*
* If we inserted the first and only tuple on the page, re-initialize the
* page from scratch.
*/
if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
{
buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
Size datalen;
char *data;
page = BufferGetPage(buffer);
if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
elog(PANIC, "invalid max offset number");
data = XLogRecGetBlockData(record, 0, &datalen);
newlen = datalen - SizeOfHeapHeader;
Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
memcpy(&xlhdr, data, SizeOfHeapHeader);
data += SizeOfHeapHeader;
htup = &tbuf.hdr;
MemSet(htup, 0, SizeofHeapTupleHeader);
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + SizeofHeapTupleHeader,
data,
newlen);
newlen += SizeofHeapTupleHeader;
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
htup->t_ctid = target_tid;
if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
true, true) == InvalidOffsetNumber)
elog(PANIC, "failed to add tuple");
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
PageSetLSN(page, lsn);
if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
PageSetAllVisible(page);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* If the page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
* XXX: Don't do this if the page was restored from full page image. We
* don't bother to update the FSM in that case, it doesn't need to be
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
}
/*
* Replay XLOG_HEAP2_MULTI_INSERT records.
*/
static void
heap_xlog_multi_insert(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_multi_insert *xlrec;
RelFileLocator rlocator;
BlockNumber blkno;
Buffer buffer;
Page page;
union
{
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
HeapTupleHeader htup;
uint32 newlen;
Size freespace = 0;
int i;
bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
XLogRedoAction action;
/*
* Insertion doesn't overwrite MVCC data, so no conflict processing is
* required.
*/
xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
/* check that the mutually exclusive flags are not both set */
Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
(xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rlocator);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, blkno, &vmbuffer);
visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
if (isinit)
{
buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
char *tupdata;
char *endptr;
Size len;
/* Tuples are stored as block data */
tupdata = XLogRecGetBlockData(record, 0, &len);
endptr = tupdata + len;
page = (Page) BufferGetPage(buffer);
for (i = 0; i < xlrec->ntuples; i++)
{
OffsetNumber offnum;
xl_multi_insert_tuple *xlhdr;
/*
* If we're reinitializing the page, the tuples are stored in
* order from FirstOffsetNumber. Otherwise there's an array of
* offsets in the WAL record, and the tuples come after that.
*/
if (isinit)
offnum = FirstOffsetNumber + i;
else
offnum = xlrec->offsets[i];
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "invalid max offset number");
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
newlen = xlhdr->datalen;
Assert(newlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet(htup, 0, SizeofHeapTupleHeader);
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + SizeofHeapTupleHeader,
tupdata,
newlen);
tupdata += newlen;
newlen += SizeofHeapTupleHeader;
htup->t_infomask2 = xlhdr->t_infomask2;
htup->t_infomask = xlhdr->t_infomask;
htup->t_hoff = xlhdr->t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "failed to add tuple");
}
if (tupdata != endptr)
elog(PANIC, "total tuple length mismatch");
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
PageSetLSN(page, lsn);
if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
PageSetAllVisible(page);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* If the page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
* XXX: Don't do this if the page was restored from full page image. We
* don't bother to update the FSM in that case, it doesn't need to be
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
}
/*
* Replay XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE records.
*/
static void
heap_xlog_update(XLogReaderState *record, bool hot_update)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
RelFileLocator rlocator;
BlockNumber oldblk;
BlockNumber newblk;
ItemPointerData newtid;
Buffer obuffer,
nbuffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleData oldtup;
HeapTupleHeader htup;
uint16 prefixlen = 0,
suffixlen = 0;
char *newp;
union
{
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
xl_heap_header xlhdr;
uint32 newlen;
Size freespace = 0;
XLogRedoAction oldaction;
XLogRedoAction newaction;
/* initialize to keep the compiler quiet */
oldtup.t_data = NULL;
oldtup.t_len = 0;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
{
/* HOT updates are never done across pages */
Assert(!hot_update);
}
else
oldblk = newblk;
ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rlocator);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, oldblk, &vmbuffer);
visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
/*
* In normal operation, it is important to lock the two pages in
* page-number order, to avoid possible deadlocks against other update
* operations going the other way. However, during WAL replay there can
* be no other update happening, so we don't need to worry about that. But
* we *do* need to worry that we don't expose an inconsistent state to Hot
* Standby queries --- so the original page can't be unlocked before we've
* added the new tuple to the new page.
*/
/* Deal with old tuple version */
oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
&obuffer);
if (oldaction == BLK_NEEDS_REDO)
{
page = BufferGetPage(obuffer);
offnum = xlrec->old_offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
oldtup.t_data = htup;
oldtup.t_len = ItemIdGetLength(lp);
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
if (hot_update)
HeapTupleHeaderSetHotUpdated(htup);
else
HeapTupleHeaderClearHotUpdated(htup);
fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
&htup->t_infomask2);
HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Set forward chain link in t_ctid */
htup->t_ctid = newtid;
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
PageSetLSN(page, lsn);
MarkBufferDirty(obuffer);
}
/*
* Read the page the new tuple goes into, if different from old.
*/
if (oldblk == newblk)
{
nbuffer = obuffer;
newaction = oldaction;
}
else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
{
nbuffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(nbuffer);
PageInit(page, BufferGetPageSize(nbuffer), 0);
newaction = BLK_NEEDS_REDO;
}
else
newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rlocator);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, newblk, &vmbuffer);
visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
/* Deal with new tuple */
if (newaction == BLK_NEEDS_REDO)
{
char *recdata;
char *recdata_end;
Size datalen;
Size tuplen;
recdata = XLogRecGetBlockData(record, 0, &datalen);
recdata_end = recdata + datalen;
page = BufferGetPage(nbuffer);
offnum = xlrec->new_offnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "invalid max offset number");
if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
{
Assert(newblk == oldblk);
memcpy(&prefixlen, recdata, sizeof(uint16));
recdata += sizeof(uint16);
}
if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
{
Assert(newblk == oldblk);
memcpy(&suffixlen, recdata, sizeof(uint16));
recdata += sizeof(uint16);
}
memcpy(&xlhdr, recdata, SizeOfHeapHeader);
recdata += SizeOfHeapHeader;
tuplen = recdata_end - recdata;
Assert(tuplen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet(htup, 0, SizeofHeapTupleHeader);
/*
* Reconstruct the new tuple using the prefix and/or suffix from the
* old tuple, and the data stored in the WAL record.
*/
newp = (char *) htup + SizeofHeapTupleHeader;
if (prefixlen > 0)
{
int len;
/* copy bitmap [+ padding] [+ oid] from WAL record */
len = xlhdr.t_hoff - SizeofHeapTupleHeader;
memcpy(newp, recdata, len);
recdata += len;
newp += len;
/* copy prefix from old tuple */
memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
newp += prefixlen;
/* copy new tuple data from WAL record */
len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
memcpy(newp, recdata, len);
recdata += len;
newp += len;
}
else
{
/*
* copy bitmap [+ padding] [+ oid] + data from record, all in one
* go
*/
memcpy(newp, recdata, tuplen);
recdata += tuplen;
newp += tuplen;
}
Assert(recdata == recdata_end);
/* copy suffix from old tuple */
if (suffixlen > 0)
memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = newtid;
offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "failed to add tuple");
if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
PageSetLSN(page, lsn);
MarkBufferDirty(nbuffer);
}
if (BufferIsValid(nbuffer) && nbuffer != obuffer)
UnlockReleaseBuffer(nbuffer);
if (BufferIsValid(obuffer))
UnlockReleaseBuffer(obuffer);
/*
* If the new page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
* However, don't update the FSM on HOT updates, because after crash
* recovery, either the old or the new tuple will certainly be dead and
* prunable. After pruning, the page will have roughly as much free space
* as it did before the update, assuming the new tuple is about the same
* size as the old one.
*
* XXX: Don't do this if the page was restored from full page image. We
* don't bother to update the FSM in that case, it doesn't need to be
* totally accurate anyway.
*/
if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
}
/*
* Replay XLOG_HEAP_CONFIRM records.
*/
static void
heap_xlog_confirm(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
/*
* Confirm tuple as actually inserted
*/
ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Replay XLOG_HEAP_LOCK records.
*/
static void
heap_xlog_lock(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
{
RelFileLocator rlocator;
Buffer vmbuffer = InvalidBuffer;
BlockNumber block;
Relation reln;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
reln = CreateFakeRelcacheEntry(rlocator);
visibilitymap_pin(reln, block, &vmbuffer);
visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
&htup->t_infomask2);
/*
* Clear relevant update flags, but only if the modified infomask says
* there's no update.
*/
if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
{
HeapTupleHeaderClearHotUpdated(htup);
/* Make sure there is no forward chain link in t_ctid */
ItemPointerSet(&htup->t_ctid,
BufferGetBlockNumber(buffer),
offnum);
}
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Replay XLOG_HEAP2_LOCK_UPDATED records.
*/
static void
heap_xlog_lock_updated(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_lock_updated *xlrec;
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
{
RelFileLocator rlocator;
Buffer vmbuffer = InvalidBuffer;
BlockNumber block;
Relation reln;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
reln = CreateFakeRelcacheEntry(rlocator);
visibilitymap_pin(reln, block, &vmbuffer);
visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
&htup->t_infomask2);
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Replay XLOG_HEAP_INPLACE records.
*/
static void
heap_xlog_inplace(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
uint32 oldlen;
Size newlen;
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
char *newtup = XLogRecGetBlockData(record, 0, &newlen);
page = BufferGetPage(buffer);
offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
elog(PANIC, "invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
oldlen = ItemIdGetLength(lp) - htup->t_hoff;
if (oldlen != newlen)
elog(PANIC, "wrong tuple length");
memcpy((char *) htup + htup->t_hoff, newtup, newlen);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
ProcessCommittedInvalidationMessages(xlrec->msgs,
xlrec->nmsgs,
xlrec->relcacheInitFileInval,
xlrec->dbId,
xlrec->tsId);
}
void
heap_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/*
* These operations don't overwrite MVCC data so no conflict processing is
* required. The ones in heap2 rmgr do.
*/
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
heap_xlog_insert(record);
break;
case XLOG_HEAP_DELETE:
heap_xlog_delete(record);
break;
case XLOG_HEAP_UPDATE:
heap_xlog_update(record, false);
break;
case XLOG_HEAP_TRUNCATE:
/*
* TRUNCATE is a no-op because the actions are already logged as
* SMGR WAL records. TRUNCATE WAL record only exists for logical
* decoding.
*/
break;
case XLOG_HEAP_HOT_UPDATE:
heap_xlog_update(record, true);
break;
case XLOG_HEAP_CONFIRM:
heap_xlog_confirm(record);
break;
case XLOG_HEAP_LOCK:
heap_xlog_lock(record);
break;
case XLOG_HEAP_INPLACE:
heap_xlog_inplace(record);
break;
default:
elog(PANIC, "heap_redo: unknown op code %u", info);
}
}
void
heap2_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_PRUNE_ON_ACCESS:
case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
heap_xlog_prune_freeze(record);
break;
case XLOG_HEAP2_VISIBLE:
heap_xlog_visible(record);
break;
case XLOG_HEAP2_MULTI_INSERT:
heap_xlog_multi_insert(record);
break;
case XLOG_HEAP2_LOCK_UPDATED:
heap_xlog_lock_updated(record);
break;
case XLOG_HEAP2_NEW_CID:
/*
* Nothing to do on a real replay, only used during logical
* decoding.
*/
break;
case XLOG_HEAP2_REWRITE:
heap_xlog_logical_rewrite(record);
break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}
}
/*
* Mask a heap page before performing consistency checks on it.
*/
void
heap_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
OffsetNumber off;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
{
ItemId iid = PageGetItemId(page, off);
char *page_item;
page_item = (char *) (page + ItemIdGetOffset(iid));
if (ItemIdIsNormal(iid))
{
HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
/*
* If xmin of a tuple is not yet frozen, we should ignore
* differences in hint bits, since they can be set without
* emitting WAL.
*/
if (!HeapTupleHeaderXminFrozen(page_htup))
page_htup->t_infomask &= ~HEAP_XACT_MASK;
else
{
/* Still we need to mask xmax hint bits. */
page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
}
/*
* During replay, we set Command Id to FirstCommandId. Hence, mask
* it. See heap_xlog_insert() for details.
*/
page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
/*
* For a speculative tuple, heap_insert() does not set ctid in the
* caller-passed heap tuple itself, leaving the ctid field to
* contain a speculative token value - a per-backend monotonically
* increasing identifier. Besides, it does not WAL-log ctid under
* any circumstances.
*
* During redo, heap_xlog_insert() sets t_ctid to current block
* number and self offset number. It doesn't care about any
* speculative insertions on the primary. Hence, we set t_ctid to
* current block number and self offset number to ignore any
* inconsistency.
*/
if (HeapTupleHeaderIsSpeculative(page_htup))
ItemPointerSet(&page_htup->t_ctid, blkno, off);
/*
* NB: Not ignoring ctid changes due to the tuple having moved
* (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
* important information that needs to be in-sync between primary
* and standby, and thus is WAL logged.
*/
}
/*
* Ignore any padding bytes after the tuple, when the length of the
* item is not MAXALIGNed.
*/
if (ItemIdHasStorage(iid))
{
int len = ItemIdGetLength(iid);
int padlen = MAXALIGN(len) - len;
if (padlen > 0)
memset(page_item + len, MASK_MARKER, padlen);
}
}
}