1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Use streaming read I/O in heap amcheck

Instead of directly invoking ReadBuffer() for each unskippable block in
the heap relation, verify_heapam() now uses the read stream API to
acquire the next buffer to check for corruption.

Author: Matheus Alcantara <matheusssilv97@gmail.com>
Co-authored-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Reviewed-by: jian he <jian.universality@gmail.com>
Discussion: https://postgr.es/m/flat/CAFY6G8eLyz7%2BsccegZYFj%3D5tAUR-GZ9uEq4Ch5gvwKqUwb_hCA%40mail.gmail.com
This commit is contained in:
Melanie Plageman
2025-03-27 14:02:40 -04:00
parent 4623d71443
commit 043799fa08
2 changed files with 109 additions and 25 deletions

View File

@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@ -118,7 +119,10 @@ typedef struct HeapCheckContext
Relation valid_toast_index;
int num_toast_indexes;
/* Values for iterating over pages in the relation */
/*
* Values for iterating over pages in the relation. `blkno` is the most
* recent block in the buffer yielded by the read stream API.
*/
BlockNumber blkno;
BufferAccessStrategy bstrategy;
Buffer buffer;
@ -153,7 +157,32 @@ typedef struct HeapCheckContext
Tuplestorestate *tupstore;
} HeapCheckContext;
/*
* The per-relation data provided to the read stream API for heap amcheck to
* use in its callback for the SKIP_PAGES_ALL_FROZEN and
* SKIP_PAGES_ALL_VISIBLE options.
*/
typedef struct HeapCheckReadStreamData
{
/*
* `range` is used by all SkipPages options. SKIP_PAGES_NONE uses the
* default read stream callback, block_range_read_stream_cb(), which takes
* a BlockRangeReadStreamPrivate as its callback_private_data. `range`
* keeps track of the current block number across
* read_stream_next_buffer() invocations.
*/
BlockRangeReadStreamPrivate range;
SkipPages skip_option;
Relation rel;
Buffer *vmbuffer;
} HeapCheckReadStreamData;
/* Internal implementation */
static BlockNumber heapcheck_read_stream_next_unskippable(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data);
static void check_tuple(HeapCheckContext *ctx,
bool *xmin_commit_status_ok,
XidCommitStatus *xmin_commit_status);
@ -231,6 +260,11 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
ReadStream *stream;
int stream_flags;
ReadStreamBlockNumberCB stream_cb;
void *stream_data;
HeapCheckReadStreamData stream_skip_data;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@ -404,7 +438,35 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
/* Now that `ctx` is set up, set up the read stream */
stream_skip_data.range.current_blocknum = first_block;
stream_skip_data.range.last_exclusive = last_block + 1;
stream_skip_data.skip_option = skip_option;
stream_skip_data.rel = ctx.rel;
stream_skip_data.vmbuffer = &vmbuffer;
if (skip_option == SKIP_PAGES_NONE)
{
stream_cb = block_range_read_stream_cb;
stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
stream_data = &stream_skip_data.range;
}
else
{
stream_cb = heapcheck_read_stream_next_unskippable;
stream_flags = READ_STREAM_DEFAULT;
stream_data = &stream_skip_data;
}
stream = read_stream_begin_relation(stream_flags,
ctx.bstrategy,
ctx.rel,
MAIN_FORKNUM,
stream_cb,
stream_data,
0);
while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@ -417,30 +479,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
/* Optionally skip over all-frozen or all-visible blocks */
if (skip_option != SKIP_PAGES_NONE)
{
int32 mapbits;
mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
&vmbuffer);
if (skip_option == SKIP_PAGES_ALL_FROZEN)
{
if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
continue;
}
if (skip_option == SKIP_PAGES_ALL_VISIBLE)
{
if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
continue;
}
}
/* Read and lock the next page. */
ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
RBM_NORMAL, ctx.bstrategy);
/* Lock the next page. */
Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@ -799,6 +842,10 @@ verify_heapam(PG_FUNCTION_ARGS)
break;
}
/* Ensure that the stream is completely read */
Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
@ -815,6 +862,42 @@ verify_heapam(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
/*
* Heap amcheck's read stream callback for getting the next unskippable block.
* This callback is only used when 'all-visible' or 'all-frozen' is provided
* as the skip option to verify_heapam(). With the default 'none',
* block_range_read_stream_cb() is used instead.
*/
static BlockNumber
heapcheck_read_stream_next_unskippable(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data)
{
HeapCheckReadStreamData *p = callback_private_data;
/* Loops over [current_blocknum, last_exclusive) blocks */
for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
{
uint8 mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
{
if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
continue;
}
if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
{
if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
continue;
}
return i;
}
return InvalidBlockNumber;
}
/*
* Shared internal implementation for report_corruption and
* report_toast_corruption.