1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-18 13:44:19 +03:00

read_stream: Introduce and use optional batchmode support

Submitting IO in larger batches can be more efficient than doing so
one-by-one, particularly for many small reads. It does, however, require
the ReadStreamBlockNumberCB callback to abide by the restrictions of AIO
batching (c.f. pgaio_enter_batchmode()). Basically, the callback may not:
a) block without first calling pgaio_submit_staged(), unless a
   to-be-waited-on lock cannot be part of a deadlock, e.g. because it is
   never held while waiting for IO.

b) directly or indirectly start another batch pgaio_enter_batchmode()

As this requires care and is nontrivial in some cases, batching is only
used with explicit opt-in.

This patch adds an explicit flag (READ_STREAM_USE_BATCHING) to read_stream and
uses it where appropriate.

There are two cases where batching would likely be beneficial, but where we
aren't using it yet:

1) bitmap heap scans, because the callback reads the VM

   This should soon be solved, because we are planning to remove the use of
   the VM, due to that not being sound.

2) The first phase of heap vacuum

   This could be made to support batchmode, but would require some care.

Reviewed-by: Noah Misch <noah@leadboat.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
This commit is contained in:
Andres Freund 2025-03-30 18:30:36 -04:00
parent f4d0730bbc
commit ae3df4b341
12 changed files with 129 additions and 12 deletions

View File

@ -447,12 +447,23 @@ verify_heapam(PG_FUNCTION_ARGS)
if (skip_option == SKIP_PAGES_NONE) if (skip_option == SKIP_PAGES_NONE)
{ {
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream_cb = block_range_read_stream_cb; stream_cb = block_range_read_stream_cb;
stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL; stream_flags = READ_STREAM_SEQUENTIAL |
READ_STREAM_FULL |
READ_STREAM_USE_BATCHING;
stream_data = &stream_skip_data.range; stream_data = &stream_skip_data.range;
} }
else else
{ {
/*
* It would not be safe to naively use use batchmode, as
* heapcheck_read_stream_next_unskippable takes locks. It shouldn't be
* too hard to convert though.
*/
stream_cb = heapcheck_read_stream_next_unskippable; stream_cb = heapcheck_read_stream_next_unskippable;
stream_flags = READ_STREAM_DEFAULT; stream_flags = READ_STREAM_DEFAULT;
stream_data = &stream_skip_data; stream_data = &stream_skip_data;

View File

@ -198,7 +198,12 @@ pg_prewarm(PG_FUNCTION_ARGS)
p.current_blocknum = first_block; p.current_blocknum = first_block;
p.last_exclusive = last_block + 1; p.last_exclusive = last_block + 1;
stream = read_stream_begin_relation(READ_STREAM_FULL, /*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream = read_stream_begin_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
NULL, NULL,
rel, rel,
forkNumber, forkNumber,

View File

@ -526,7 +526,13 @@ collect_visibility_data(Oid relid, bool include_pd)
{ {
p.current_blocknum = 0; p.current_blocknum = 0;
p.last_exclusive = nblocks; p.last_exclusive = nblocks;
stream = read_stream_begin_relation(READ_STREAM_FULL,
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream = read_stream_begin_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
bstrategy, bstrategy,
rel, rel,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -210,7 +210,13 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
needLock = !RELATION_IS_LOCAL(rel); needLock = !RELATION_IS_LOCAL(rel);
p.current_blocknum = GIST_ROOT_BLKNO; p.current_blocknum = GIST_ROOT_BLKNO;
stream = read_stream_begin_relation(READ_STREAM_FULL,
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream = read_stream_begin_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
info->strategy, info->strategy,
rel, rel,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -1206,7 +1206,15 @@ heap_beginscan(Relation relation, Snapshot snapshot,
else else
cb = heap_scan_stream_read_next_serial; cb = heap_scan_stream_read_next_serial;
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, /* ---
* It is safe to use batchmode as the only locks taken by `cb`
* are never taken while waiting for IO:
* - SyncScanLock is used in the non-parallel case
* - in the parallel case, only spinlocks and atomics are used
* ---
*/
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL |
READ_STREAM_USE_BATCHING,
scan->rs_strategy, scan->rs_strategy,
scan->rs_base.rs_rd, scan->rs_base.rs_rd,
MAIN_FORKNUM, MAIN_FORKNUM,
@ -1216,6 +1224,12 @@ heap_beginscan(Relation relation, Snapshot snapshot,
} }
else if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN) else if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN)
{ {
/*
* Currently we can't trivially use batching, due to the
* VM_ALL_VISIBLE check in bitmapheap_stream_read_next. While that
* could be made safe, we are about to remove the all-visible logic
* from bitmap scans due to its unsoundness.
*/
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_DEFAULT, scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
scan->rs_strategy, scan->rs_strategy,
scan->rs_base.rs_rd, scan->rs_base.rs_rd,

View File

@ -1225,7 +1225,12 @@ lazy_scan_heap(LVRelState *vacrel)
vacrel->next_unskippable_eager_scanned = false; vacrel->next_unskippable_eager_scanned = false;
vacrel->next_unskippable_vmbuffer = InvalidBuffer; vacrel->next_unskippable_vmbuffer = InvalidBuffer;
/* Set up the read stream for vacuum's first pass through the heap */ /*
* Set up the read stream for vacuum's first pass through the heap.
*
* This could be made safe for READ_STREAM_USE_BATCHING, but only with
* explicit work in heap_vac_scan_next_block.
*/
stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE, stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
vacrel->bstrategy, vacrel->bstrategy,
vacrel->rel, vacrel->rel,
@ -2669,6 +2674,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
* Read stream callback for vacuum's third phase (second pass over the heap). * Read stream callback for vacuum's third phase (second pass over the heap).
* Gets the next block from the TID store and returns it or InvalidBlockNumber * Gets the next block from the TID store and returns it or InvalidBlockNumber
* if there are no further blocks to vacuum. * if there are no further blocks to vacuum.
*
* NB: Assumed to be safe to use with READ_STREAM_USE_BATCHING.
*/ */
static BlockNumber static BlockNumber
vacuum_reap_lp_read_stream_next(ReadStream *stream, vacuum_reap_lp_read_stream_next(ReadStream *stream,
@ -2732,8 +2739,16 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
iter = TidStoreBeginIterate(vacrel->dead_items); iter = TidStoreBeginIterate(vacrel->dead_items);
/* Set up the read stream for vacuum's second pass through the heap */ /*
stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE, * Set up the read stream for vacuum's second pass through the heap.
*
* It is safe to use batchmode, as vacuum_reap_lp_read_stream_next() does
* not need to wait for IO and does not perform locking. Once we support
* parallelism it should still be fine, as presumably the holder of locks
* would never be blocked by IO while holding the lock.
*/
stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
READ_STREAM_USE_BATCHING,
vacrel->bstrategy, vacrel->bstrategy,
vacrel->rel, vacrel->rel,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -1064,7 +1064,13 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
needLock = !RELATION_IS_LOCAL(rel); needLock = !RELATION_IS_LOCAL(rel);
p.current_blocknum = BTREE_METAPAGE + 1; p.current_blocknum = BTREE_METAPAGE + 1;
stream = read_stream_begin_relation(READ_STREAM_FULL,
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream = read_stream_begin_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
info->strategy, info->strategy,
rel, rel,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -822,7 +822,13 @@ spgvacuumscan(spgBulkDeleteState *bds)
/* We can skip locking for new or temp relations */ /* We can skip locking for new or temp relations */
needLock = !RELATION_IS_LOCAL(index); needLock = !RELATION_IS_LOCAL(index);
p.current_blocknum = SPGIST_METAPAGE_BLKNO + 1; p.current_blocknum = SPGIST_METAPAGE_BLKNO + 1;
stream = read_stream_begin_relation(READ_STREAM_FULL,
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
stream = read_stream_begin_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
bds->info->strategy, bds->info->strategy,
index, index,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -1237,7 +1237,12 @@ acquire_sample_rows(Relation onerel, int elevel,
scan = table_beginscan_analyze(onerel); scan = table_beginscan_analyze(onerel);
slot = table_slot_create(onerel, NULL); slot = table_slot_create(onerel, NULL);
stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE, /*
* It is safe to use batching, as block_sampling_read_stream_next never
* blocks.
*/
stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
READ_STREAM_USE_BATCHING,
vac_strategy, vac_strategy,
scan->rs_rd, scan->rs_rd,
MAIN_FORKNUM, MAIN_FORKNUM,

View File

@ -102,6 +102,7 @@ struct ReadStream
int16 initialized_buffers; int16 initialized_buffers;
int read_buffers_flags; int read_buffers_flags;
bool sync_mode; /* using io_method=sync */ bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
bool advice_enabled; bool advice_enabled;
bool temporary; bool temporary;
@ -403,6 +404,15 @@ read_stream_start_pending_read(ReadStream *stream)
static void static void
read_stream_look_ahead(ReadStream *stream) read_stream_look_ahead(ReadStream *stream)
{ {
/*
* Allow amortizing the cost of submitting IO over multiple IOs. This
* requires that we don't do any operations that could lead to a deadlock
* with staged-but-unsubmitted IO. The callback needs to opt-in to being
* careful.
*/
if (stream->batch_mode)
pgaio_enter_batchmode();
while (stream->ios_in_progress < stream->max_ios && while (stream->ios_in_progress < stream->max_ios &&
stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
{ {
@ -450,6 +460,8 @@ read_stream_look_ahead(ReadStream *stream)
{ {
/* We've hit the buffer or I/O limit. Rewind and stop here. */ /* We've hit the buffer or I/O limit. Rewind and stop here. */
read_stream_unget_block(stream, blocknum); read_stream_unget_block(stream, blocknum);
if (stream->batch_mode)
pgaio_exit_batchmode();
return; return;
} }
} }
@ -484,6 +496,9 @@ read_stream_look_ahead(ReadStream *stream)
* time. * time.
*/ */
Assert(stream->pinned_buffers > 0 || stream->distance == 0); Assert(stream->pinned_buffers > 0 || stream->distance == 0);
if (stream->batch_mode)
pgaio_exit_batchmode();
} }
/* /*
@ -617,6 +632,7 @@ read_stream_begin_impl(int flags,
MAXALIGN(&stream->ios[Max(1, max_ios)]); MAXALIGN(&stream->ios[Max(1, max_ios)]);
stream->sync_mode = io_method == IOMETHOD_SYNC; stream->sync_mode = io_method == IOMETHOD_SYNC;
stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
#ifdef USE_PREFETCH #ifdef USE_PREFETCH

View File

@ -5100,7 +5100,13 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
p.current_blocknum = 0; p.current_blocknum = 0;
p.last_exclusive = nblocks; p.last_exclusive = nblocks;
src_smgr = smgropen(srclocator, INVALID_PROC_NUMBER); src_smgr = smgropen(srclocator, INVALID_PROC_NUMBER);
src_stream = read_stream_begin_smgr_relation(READ_STREAM_FULL,
/*
* It is safe to use batchmode as block_range_read_stream_cb takes no
* locks.
*/
src_stream = read_stream_begin_smgr_relation(READ_STREAM_FULL |
READ_STREAM_USE_BATCHING,
bstrategy_src, bstrategy_src,
src_smgr, src_smgr,
permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED, permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED,

View File

@ -42,6 +42,27 @@
*/ */
#define READ_STREAM_FULL 0x04 #define READ_STREAM_FULL 0x04
/* ---
* Opt-in to using AIO batchmode.
*
* Submitting IO in larger batches can be more efficient than doing so
* one-by-one, particularly for many small reads. It does, however, require
* the ReadStreamBlockNumberCB callback to abide by the restrictions of AIO
* batching (c.f. pgaio_enter_batchmode()). Basically, the callback may not:
*
* a) block without first calling pgaio_submit_staged(), unless a
* to-be-waited-on lock cannot be part of a deadlock, e.g. because it is
* never held while waiting for IO.
*
* b) start another batch (without first exiting batchmode and re-entering
* before returning)
*
* As this requires care and is nontrivial in some cases, batching is only
* used with explicit opt-in.
* ---
*/
#define READ_STREAM_USE_BATCHING 0x08
struct ReadStream; struct ReadStream;
typedef struct ReadStream ReadStream; typedef struct ReadStream ReadStream;