1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

aio: Add test_aio module

To make the tests possible, a few functions from bufmgr.c/localbuf.c had to be
exported, via buf_internals.h.

Reviewed-by: Noah Misch <noah@leadboat.com>
Co-authored-by: Andres Freund <andres@anarazel.de>
Co-authored-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
This commit is contained in:
Andres Freund
2025-04-01 13:47:46 -04:00
parent 60f566b4f2
commit 93bc3d75d8
13 changed files with 2622 additions and 8 deletions

View File

@@ -518,10 +518,6 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
uint32 set_flag_bits, bool forget_owner,
bool release_aio);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
@@ -5962,7 +5958,7 @@ WaitIO(BufferDesc *buf)
* find out if they can perform the I/O as part of a larger operation, without
* waiting for the answer or distinguishing the reasons why not.
*/
static bool
bool
StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
{
uint32 buf_state;
@@ -6019,7 +6015,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
* resource owner. (forget_owner=false is used when the resource owner itself
* is being released)
*/
static void
void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
bool forget_owner, bool release_aio)
{

View File

@@ -57,7 +57,6 @@ static int NLocalPinnedBuffers = 0;
static void InitLocalBuffers(void);
static Block GetLocalBufferStorage(void);
static Buffer GetLocalVictimBuffer(void);
static void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
/*
@@ -597,7 +596,7 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit
*
* See also InvalidateBuffer().
*/
static void
void
InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
{
Buffer buffer = BufferDescriptorGetBuffer(bufHdr);

View File

@@ -434,6 +434,12 @@ extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_co
extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
IOContext io_context, BufferTag *tag);
/* solely to make it easier to write tests */
extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
bool forget_owner, bool release_aio);
/* freelist.c */
extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
@@ -478,6 +484,7 @@ extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
uint32 set_flag_bits, bool release_aio);
extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
extern void DropRelationLocalBuffers(RelFileLocator rlocator,
ForkNumber forkNum,
BlockNumber firstDelBlock);

View File

@@ -14,6 +14,7 @@ SUBDIRS = \
oauth_validator \
plsample \
spgist_name_ops \
test_aio \
test_bloomfilter \
test_copy_callbacks \
test_custom_rmgrs \

View File

@@ -13,6 +13,7 @@ subdir('oauth_validator')
subdir('plsample')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_aio')
subdir('test_bloomfilter')
subdir('test_copy_callbacks')
subdir('test_custom_rmgrs')

2
src/test/modules/test_aio/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Generated subdirectories
/tmp_check/

View File

@@ -0,0 +1,26 @@
# src/test/modules/test_aio/Makefile
PGFILEDESC = "test_aio - test code for AIO"
MODULE_big = test_aio
OBJS = \
$(WIN32RES) \
test_aio.o
EXTENSION = test_aio
DATA = test_aio--1.0.sql
TAP_TESTS = 1
export enable_injection_points
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/test_aio
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@@ -0,0 +1,37 @@
# Copyright (c) 2024-2025, PostgreSQL Global Development Group
test_aio_sources = files(
'test_aio.c',
)
if host_system == 'windows'
test_aio_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
'--NAME', 'test_aio',
'--FILEDESC', 'test_aio - test code for AIO',])
endif
test_aio = shared_module('test_aio',
test_aio_sources,
kwargs: pg_test_mod_args,
)
test_install_libs += test_aio
test_install_data += files(
'test_aio.control',
'test_aio--1.0.sql',
)
tests += {
'name': 'test_aio',
'sd': meson.current_source_dir(),
'bd': meson.current_build_dir(),
'tap': {
'env': {
'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
},
'tests': [
't/001_aio.pl',
't/002_io_workers.pl',
],
},
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,125 @@
# Copyright (c) 2025, PostgreSQL Global Development Group
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
use List::Util qw(shuffle);
my $node = PostgreSQL::Test::Cluster->new('worker');
$node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
));
$node->start();
# Test changing the number of I/O worker processes while also evaluating the
# handling of their termination.
test_number_of_io_workers_dynamic($node);
$node->stop();
done_testing();
sub test_number_of_io_workers_dynamic
{
my $node = shift;
my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
# Verify that worker count can't be set to 33 (above the max)
change_number_of_io_workers($node, 33, $prev_worker_count, 1);
# Try changing IO workers to a random value and verify that the worker
# count ends up as expected. Always test the min/max of workers.
#
# Valid range for io_workers is [1, 32]. 8 tests in total seems
# reasonable.
my @io_workers_range = shuffle(1 ... 32);
foreach my $worker_count (1, 32, @io_workers_range[ 0, 6 ])
{
$prev_worker_count =
change_number_of_io_workers($node, $worker_count,
$prev_worker_count, 0);
}
}
sub change_number_of_io_workers
{
my $node = shift;
my $worker_count = shift;
my $prev_worker_count = shift;
my $expect_failure = shift;
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
$node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
ok( $stderr =~
/$worker_count is outside the valid range for parameter "io_workers"/,
"updating number of io_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
is( $node->safe_psql('postgres', 'SHOW io_workers'),
$worker_count,
"updating number of io_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
terminate_io_worker($node, $worker_count);
check_io_worker_count($node, $worker_count);
return $worker_count;
}
}
sub terminate_io_worker
{
my $node = shift;
my $worker_count = shift;
my ($pid, $ret);
# Select a random io worker
$pid = $node->safe_psql(
'postgres',
qq(SELECT pid FROM pg_stat_activity WHERE
backend_type = 'io worker' ORDER BY RANDOM() LIMIT 1));
# terminate IO worker with SIGINT
is(PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'INT', $pid),
0, "random io worker process signalled with INT");
# Check that worker exits
ok( $node->poll_query_until(
'postgres',
qq(SELECT COUNT(*) FROM pg_stat_activity WHERE pid = $pid), '0'),
"random io worker process exited after signal");
}
sub check_io_worker_count
{
my $node = shift;
my $worker_count = shift;
ok( $node->poll_query_until(
'postgres',
qq(SELECT COUNT(*) FROM pg_stat_activity WHERE backend_type = 'io worker'),
$worker_count),
"io worker count is $worker_count");
}

View File

@@ -0,0 +1,108 @@
/* src/test/modules/test_aio/test_aio--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_aio" to load this file. \quit
CREATE FUNCTION errno_from_string(sym text)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION grow_rel(rel regclass, nblocks int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION modify_rel_block(rel regclass, blockno int,
zero bool DEFAULT false,
corrupt_header bool DEFAULT false,
corrupt_checksum bool DEFAULT false)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION read_rel_block_ll(
rel regclass,
blockno int,
nblocks int DEFAULT 1,
wait_complete bool DEFAULT true,
batchmode_enter bool DEFAULT false,
smgrreleaseall bool DEFAULT false,
batchmode_exit bool DEFAULT false,
zero_on_error bool DEFAULT false)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
RETURNS pg_catalog.bool STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed bool, io_error bool, release_aio bool)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Handle related functions
*/
CREATE FUNCTION handle_get_and_error()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION handle_get_twice()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION handle_get()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION handle_get_release()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION handle_release_last()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Batchmode related functions
*/
CREATE FUNCTION batch_start()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION batch_end()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Injection point related functions
*/
CREATE FUNCTION inj_io_short_read_attach(result int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_short_read_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_attach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;

View File

@@ -0,0 +1,806 @@
/*-------------------------------------------------------------------------
*
* test_aio.c
* Helpers to write tests for AIO
*
* This module provides interface functions for C functionality to SQL, to
* make it possible to test AIO related behavior in a targeted way from SQL.
* It'd not generally be safe to export these functions to SQL, but for a test
* that's fine.
*
* Copyright (c) 2020-2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_aio/test_aio.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/relation.h"
#include "fmgr.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/checksum.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
PG_MODULE_MAGIC;
typedef struct InjIoErrorState
{
bool enabled_short_read;
bool enabled_reopen;
bool short_read_result_set;
int short_read_result;
} InjIoErrorState;
static InjIoErrorState * inj_io_error_state;
/* Shared memory init callbacks */
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static PgAioHandle *last_handle;
static void
test_aio_shmem_request(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(sizeof(InjIoErrorState));
}
static void
test_aio_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
/* Create or attach to the shared memory state */
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
inj_io_error_state = ShmemInitStruct("injection_points",
sizeof(InjIoErrorState),
&found);
if (!found)
{
/* First time through, initialize */
inj_io_error_state->enabled_short_read = false;
inj_io_error_state->enabled_reopen = false;
#ifdef USE_INJECTION_POINTS
InjectionPointAttach("AIO_PROCESS_COMPLETION_BEFORE_SHARED",
"test_aio",
"inj_io_short_read",
NULL,
0);
InjectionPointLoad("AIO_PROCESS_COMPLETION_BEFORE_SHARED");
InjectionPointAttach("AIO_WORKER_AFTER_REOPEN",
"test_aio",
"inj_io_reopen",
NULL,
0);
InjectionPointLoad("AIO_WORKER_AFTER_REOPEN");
#endif
}
else
{
/*
* Pre-load the injection points now, so we can call them in a
* critical section.
*/
#ifdef USE_INJECTION_POINTS
InjectionPointLoad("AIO_PROCESS_COMPLETION_BEFORE_SHARED");
InjectionPointLoad("AIO_WORKER_AFTER_REOPEN");
elog(LOG, "injection point loaded");
#endif
}
LWLockRelease(AddinShmemInitLock);
}
void
_PG_init(void)
{
if (!process_shared_preload_libraries_in_progress)
return;
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = test_aio_shmem_request;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = test_aio_shmem_startup;
}
PG_FUNCTION_INFO_V1(errno_from_string);
Datum
errno_from_string(PG_FUNCTION_ARGS)
{
const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
if (strcmp(sym, "EIO") == 0)
PG_RETURN_INT32(EIO);
else if (strcmp(sym, "EAGAIN") == 0)
PG_RETURN_INT32(EAGAIN);
else if (strcmp(sym, "EINTR") == 0)
PG_RETURN_INT32(EINTR);
else if (strcmp(sym, "ENOSPC") == 0)
PG_RETURN_INT32(ENOSPC);
else if (strcmp(sym, "EROFS") == 0)
PG_RETURN_INT32(EROFS);
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg_internal("%s is not a supported errno value", sym));
PG_RETURN_INT32(0);
}
PG_FUNCTION_INFO_V1(grow_rel);
Datum
grow_rel(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
uint32 nblocks = PG_GETARG_UINT32(1);
Relation rel;
#define MAX_BUFFERS_TO_EXTEND_BY 64
Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
rel = relation_open(relid, AccessExclusiveLock);
while (nblocks > 0)
{
uint32 extend_by_pages;
extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
ExtendBufferedRelBy(BMR_REL(rel),
MAIN_FORKNUM,
NULL,
0,
extend_by_pages,
victim_buffers,
&extend_by_pages);
nblocks -= extend_by_pages;
for (uint32 i = 0; i < extend_by_pages; i++)
{
ReleaseBuffer(victim_buffers[i]);
}
}
relation_close(rel, NoLock);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(modify_rel_block);
Datum
modify_rel_block(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
bool zero = PG_GETARG_BOOL(2);
bool corrupt_header = PG_GETARG_BOOL(3);
bool corrupt_checksum = PG_GETARG_BOOL(4);
Page page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
Relation rel;
Buffer buf;
PageHeader ph;
rel = relation_open(relid, AccessExclusiveLock);
buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
RBM_ZERO_ON_ERROR, NULL);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
/*
* copy the page to local memory, seems nicer than to directly modify in
* the buffer pool.
*/
memcpy(page, BufferGetPage(buf), BLCKSZ);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
/*
* Don't want to have a buffer in-memory that's marked valid where the
* on-disk contents are invalid. Particularly not if the in-memory buffer
* could be dirty...
*
* While we hold an AEL on the relation nobody else should be able to read
* the buffer in.
*
* NB: This is probably racy, better don't copy this to non-test code.
*/
if (BufferIsLocal(buf))
InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
else
EvictUnpinnedBuffer(buf);
/*
* Now modify the page as asked for by the caller.
*/
if (zero)
memset(page, 0, BufferGetPageSize(buf));
if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
PageInit(page, BufferGetPageSize(buf), 0);
ph = (PageHeader) page;
if (corrupt_header)
ph->pd_special = BLCKSZ + 1;
if (corrupt_checksum)
{
bool successfully_corrupted = 0;
/*
* Any single modification of the checksum could just end up being
* valid again, due to e.g. corrupt_header changing the data in a way
* that'd result in the "corrupted" checksum, or the checksum already
* being invalid. Retry in that, unlikely, case.
*/
for (int i = 0; i < 100; i++)
{
uint16 verify_checksum;
uint16 old_checksum;
old_checksum = ph->pd_checksum;
ph->pd_checksum = old_checksum + 1;
elog(LOG, "corrupting checksum of blk %u from %u to %u",
blkno, old_checksum, ph->pd_checksum);
verify_checksum = pg_checksum_page(page, blkno);
if (verify_checksum != ph->pd_checksum)
{
successfully_corrupted = true;
break;
}
}
if (!successfully_corrupted)
elog(ERROR, "could not corrupt checksum, what's going on?");
}
else
{
PageSetChecksumInplace(page, blkno);
}
smgrwrite(RelationGetSmgr(rel),
MAIN_FORKNUM, blkno, page, true);
relation_close(rel, NoLock);
PG_RETURN_VOID();
}
/*
* Ensures a buffer for rel & blkno is in shared buffers, without actually
* caring about the buffer contents. Used to set up test scenarios.
*/
static Buffer
create_toy_buffer(Relation rel, BlockNumber blkno)
{
Buffer buf;
BufferDesc *buf_hdr;
uint32 buf_state;
bool was_pinned = false;
/* place buffer in shared buffers without erroring out */
buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (RelationUsesLocalBuffers(rel))
{
buf_hdr = GetLocalBufferDescriptor(-buf - 1);
buf_state = pg_atomic_read_u32(&buf_hdr->state);
}
else
{
buf_hdr = GetBufferDescriptor(buf - 1);
buf_state = LockBufHdr(buf_hdr);
}
/*
* We should be the only backend accessing this buffer. This is just a
* small bit of belt-and-suspenders defense, none of this code should ever
* run in a cluster with real data.
*/
if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
was_pinned = true;
else
buf_state &= ~(BM_VALID | BM_DIRTY);
if (RelationUsesLocalBuffers(rel))
pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
else
UnlockBufHdr(buf_hdr, buf_state);
if (was_pinned)
elog(ERROR, "toy buffer %d was already pinned",
buf);
return buf;
}
/*
* A "low level" read. This does similar things to what
* StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
* less sanity).
*/
PG_FUNCTION_INFO_V1(read_rel_block_ll);
Datum
read_rel_block_ll(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
int nblocks = PG_GETARG_INT32(2);
bool wait_complete = PG_GETARG_BOOL(3);
bool batchmode_enter = PG_GETARG_BOOL(4);
bool call_smgrreleaseall = PG_GETARG_BOOL(5);
bool batchmode_exit = PG_GETARG_BOOL(6);
bool zero_on_error = PG_GETARG_BOOL(7);
Relation rel;
Buffer bufs[PG_IOV_MAX];
BufferDesc *buf_hdrs[PG_IOV_MAX];
Page pages[PG_IOV_MAX];
uint8 srb_flags = 0;
PgAioReturn ior;
PgAioHandle *ioh;
PgAioWaitRef iow;
SMgrRelation smgr;
if (nblocks <= 0 || nblocks > PG_IOV_MAX)
elog(ERROR, "nblocks is out of range");
rel = relation_open(relid, AccessExclusiveLock);
for (int i = 0; i < nblocks; i++)
{
bufs[i] = create_toy_buffer(rel, blkno + i);
pages[i] = BufferGetBlock(bufs[i]);
buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
GetLocalBufferDescriptor(-bufs[i] - 1) :
GetBufferDescriptor(bufs[i] - 1);
}
smgr = RelationGetSmgr(rel);
pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
pgaio_io_get_wref(ioh, &iow);
if (RelationUsesLocalBuffers(rel))
{
for (int i = 0; i < nblocks; i++)
StartLocalBufferIO(buf_hdrs[i], true, false);
pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
}
else
{
for (int i = 0; i < nblocks; i++)
StartBufferIO(buf_hdrs[i], true, false);
}
pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
if (zero_on_error | zero_damaged_pages)
srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
if (ignore_checksum_failure)
srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
pgaio_io_register_callbacks(ioh,
RelationUsesLocalBuffers(rel) ?
PGAIO_HCB_LOCAL_BUFFER_READV :
PGAIO_HCB_SHARED_BUFFER_READV,
srb_flags);
if (batchmode_enter)
pgaio_enter_batchmode();
smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
(void *) pages, nblocks);
if (call_smgrreleaseall)
smgrreleaseall();
if (batchmode_exit)
pgaio_exit_batchmode();
for (int i = 0; i < nblocks; i++)
ReleaseBuffer(bufs[i]);
if (wait_complete)
{
pgaio_wref_wait(&iow);
if (ior.result.status != PGAIO_RS_OK)
pgaio_result_report(ior.result,
&ior.target_data,
ior.result.status == PGAIO_RS_ERROR ?
ERROR : WARNING);
}
relation_close(rel, NoLock);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(invalidate_rel_block);
Datum
invalidate_rel_block(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
Relation rel;
PrefetchBufferResult pr;
Buffer buf;
rel = relation_open(relid, AccessExclusiveLock);
/*
* This is a gross hack, but there's no other API exposed that allows to
* get a buffer ID without actually reading the block in.
*/
pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
buf = pr.recent_buffer;
if (BufferIsValid(buf))
{
/* if the buffer contents aren't valid, this'll return false */
if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
{
BufferDesc *buf_hdr = BufferIsLocal(buf) ?
GetLocalBufferDescriptor(-buf - 1)
: GetBufferDescriptor(buf - 1);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (pg_atomic_read_u32(&buf_hdr->state) & BM_DIRTY)
{
if (BufferIsLocal(buf))
FlushLocalBuffer(buf_hdr, NULL);
else
FlushOneBuffer(buf);
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
if (BufferIsLocal(buf))
InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
else if (!EvictUnpinnedBuffer(buf))
elog(ERROR, "couldn't evict");
}
}
relation_close(rel, AccessExclusiveLock);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(buffer_create_toy);
Datum
buffer_create_toy(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
Relation rel;
Buffer buf;
rel = relation_open(relid, AccessExclusiveLock);
buf = create_toy_buffer(rel, blkno);
ReleaseBuffer(buf);
relation_close(rel, NoLock);
PG_RETURN_INT32(buf);
}
PG_FUNCTION_INFO_V1(buffer_call_start_io);
Datum
buffer_call_start_io(PG_FUNCTION_ARGS)
{
Buffer buf = PG_GETARG_INT32(0);
bool for_input = PG_GETARG_BOOL(1);
bool nowait = PG_GETARG_BOOL(2);
bool can_start;
if (BufferIsLocal(buf))
can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
for_input, nowait);
else
can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
for_input, nowait);
/*
* For tests we don't want the resowner release preventing us from
* orchestrating odd scenarios.
*/
if (can_start && !BufferIsLocal(buf))
ResourceOwnerForgetBufferIO(CurrentResourceOwner,
buf);
ereport(LOG,
errmsg("buffer %d after StartBufferIO: %s",
buf, DebugPrintBufferRefcount(buf)),
errhidestmt(true), errhidecontext(true));
PG_RETURN_BOOL(can_start);
}
PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
Datum
buffer_call_terminate_io(PG_FUNCTION_ARGS)
{
Buffer buf = PG_GETARG_INT32(0);
bool for_input = PG_GETARG_BOOL(1);
bool succeed = PG_GETARG_BOOL(2);
bool io_error = PG_GETARG_BOOL(3);
bool release_aio = PG_GETARG_BOOL(4);
bool clear_dirty = false;
uint32 set_flag_bits = 0;
if (io_error)
set_flag_bits |= BM_IO_ERROR;
if (for_input)
{
clear_dirty = false;
if (succeed)
set_flag_bits |= BM_VALID;
}
else
{
if (succeed)
clear_dirty = true;
}
ereport(LOG,
errmsg("buffer %d before Terminate[Local]BufferIO: %s",
buf, DebugPrintBufferRefcount(buf)),
errhidestmt(true), errhidecontext(true));
if (BufferIsLocal(buf))
TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
clear_dirty, set_flag_bits, release_aio);
else
TerminateBufferIO(GetBufferDescriptor(buf - 1),
clear_dirty, set_flag_bits, false, release_aio);
ereport(LOG,
errmsg("buffer %d after Terminate[Local]BufferIO: %s",
buf, DebugPrintBufferRefcount(buf)),
errhidestmt(true), errhidecontext(true));
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
{
last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(handle_release_last);
Datum
handle_release_last(PG_FUNCTION_ARGS)
{
if (!last_handle)
elog(ERROR, "no handle");
pgaio_io_release(last_handle);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(handle_get_and_error);
Datum
handle_get_and_error(PG_FUNCTION_ARGS)
{
pgaio_io_acquire(CurrentResourceOwner, NULL);
elog(ERROR, "as you command");
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(handle_get_twice);
Datum
handle_get_twice(PG_FUNCTION_ARGS)
{
pgaio_io_acquire(CurrentResourceOwner, NULL);
pgaio_io_acquire(CurrentResourceOwner, NULL);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(handle_get_release);
Datum
handle_get_release(PG_FUNCTION_ARGS)
{
PgAioHandle *handle;
handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
pgaio_io_release(handle);
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(batch_start);
Datum
batch_start(PG_FUNCTION_ARGS)
{
pgaio_enter_batchmode();
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(batch_end);
Datum
batch_end(PG_FUNCTION_ARGS)
{
pgaio_exit_batchmode();
PG_RETURN_VOID();
}
#ifdef USE_INJECTION_POINTS
extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data);
extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data);
void
inj_io_short_read(const char *name, const void *private_data)
{
PgAioHandle *ioh;
ereport(LOG,
errmsg("short read injection point called, is enabled: %d",
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
if (inj_io_error_state->enabled_short_read)
{
ioh = pgaio_inj_io_get();
/*
* Only shorten reads that are actually longer than the target size,
* otherwise we can trigger over-reads.
*/
if (inj_io_error_state->short_read_result_set
&& ioh->op == PGAIO_OP_READV
&& inj_io_error_state->short_read_result <= ioh->result)
{
struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
int32 old_result = ioh->result;
int32 new_result = inj_io_error_state->short_read_result;
int32 processed = 0;
ereport(LOG,
errmsg("short read inject point, changing result from %d to %d",
old_result, new_result),
errhidestmt(true), errhidecontext(true));
/*
* The underlying IO actually completed OK, and thus the "invalid"
* portion of the IOV actually contains valid data. That can hide
* a lot of problems, e.g. if we were to wrongly mark a buffer,
* that wasn't read according to the shortened-read, IO as valid,
* the contents would look valid and we might miss a bug.
*
* To avoid that, iterate through the IOV and zero out the
* "failed" portion of the IO.
*/
for (int i = 0; i < ioh->op_data.read.iov_length; i++)
{
if (processed + iov[i].iov_len <= new_result)
processed += iov[i].iov_len;
else if (processed <= new_result)
{
uint32 ok_part = new_result - processed;
memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
processed += iov[i].iov_len;
}
else
{
memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
}
}
ioh->result = new_result;
}
}
}
void
inj_io_reopen(const char *name, const void *private_data)
{
ereport(LOG,
errmsg("reopen injection point called, is enabled: %d",
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
if (inj_io_error_state->enabled_reopen)
elog(ERROR, "injection point triggering failure to reopen ");
}
#endif
PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
Datum
inj_io_short_read_attach(PG_FUNCTION_ARGS)
{
#ifdef USE_INJECTION_POINTS
inj_io_error_state->enabled_short_read = true;
inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
if (inj_io_error_state->short_read_result_set)
inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
#else
elog(ERROR, "injection points not supported");
#endif
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
Datum
inj_io_short_read_detach(PG_FUNCTION_ARGS)
{
#ifdef USE_INJECTION_POINTS
inj_io_error_state->enabled_short_read = false;
#else
elog(ERROR, "injection points not supported");
#endif
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
Datum
inj_io_reopen_attach(PG_FUNCTION_ARGS)
{
#ifdef USE_INJECTION_POINTS
inj_io_error_state->enabled_reopen = true;
#else
elog(ERROR, "injection points not supported");
#endif
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
Datum
inj_io_reopen_detach(PG_FUNCTION_ARGS)
{
#ifdef USE_INJECTION_POINTS
inj_io_error_state->enabled_reopen = false;
#else
elog(ERROR, "injection points not supported");
#endif
PG_RETURN_VOID();
}

View File

@@ -0,0 +1,3 @@
comment = 'Test code for AIO'
default_version = '1.0'
module_pathname = '$libdir/test_aio'