1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-19 13:42:17 +03:00

aio: Add core asynchronous I/O infrastructure

The main motivations to use AIO in PostgreSQL are:

a) Reduce the time spent waiting for IO by issuing IO sufficiently early.

   In a few places we have approximated this using posix_fadvise() based
   prefetching, but that is fairly limited (no completion feedback, double the
   syscalls, only works with buffered IO, only works on some OSs).

b) Allow to use Direct-I/O (DIO).

   DIO can offload most of the work for IO to hardware and thus increase
   throughput / decrease CPU utilization, as well as reduce latency.  While we
   have gained the ability to configure DIO in d4e71df6, it is not yet usable
   for real world workloads, as every IO is executed synchronously.

For portability, the new AIO infrastructure allows to implement AIO using
different methods. The choice of the AIO method is controlled by the new
io_method GUC. As of this commit, the only implemented method is "sync",
i.e. AIO is not actually executed asynchronously. The "sync" method exists to
allow to bypass most of the new code initially.

Subsequent commits will introduce additional IO methods, including a
cross-platform method implemented using worker processes and a linux specific
method using io_uring.

To allow different parts of postgres to use AIO, the core AIO infrastructure
does not need to know what kind of files it is operating on. The necessary
behavioral differences for different files are abstracted as "AIO
Targets". One example target would be smgr. For boring portability reasons,
all targets currently need to be added to an array in aio_target.c.  This
commit does not implement any AIO targets, just the infrastructure for
them. The smgr target will be added in a later commit.

Completion (and other events) of IOs for one type of file (i.e. one AIO
target) need to be reacted to differently, based on the IO operation and the
callsite. This is made possible by callbacks that can be registered on
IOs. E.g. an smgr read into a local buffer does not need to update the
corresponding BufferDesc (as there is none), but a read into shared buffers
does.  This commit does not contain any callbacks, they will be added in
subsequent commits.

For now the AIO infrastructure only understands READV and WRITEV operations,
but it is expected that more operations will be added. E.g. fsync/fdatasync,
flush_range and network operations like send/recv.

As of this commit, nothing uses the AIO infrastructure. Later commits will add
an smgr target, md.c and bufmgr.c callbacks and then finally use AIO for
read_stream.c IO, which, in one fell swoop, will convert all read stream users
to AIO.

The goal is to use AIO in many more places. There are patches to use AIO for
checkpointer and bgwriter that are reasonably close to being ready. There also
are prototypes to use it for WAL, relation extension, backend writes and many
more. Those prototypes were important to ensure the design of the AIO
subsystem is not too limiting (e.g. WAL writes need to happen in critical
sections, which influenced a lot of the design).

A future commit will add an AIO README explaining the AIO architecture and how
to use the AIO subsystem. The README is added later, as it references details
only added in later commits.

Many many more people than the folks named below have contributed with
feedback, work on semi-independent patches etc. E.g. various folks have
contributed patches to use the read stream infrastructure (added by Thomas in
b5a9b18cd0) in more places. Similarly, a *lot* of folks have contributed to
the CI infrastructure, which I had started to work on to make adding AIO
feasible.

Some of the work by contributors has gone into the "v1" prototype of AIO,
which heavily influenced the current design of the AIO subsystem. None of the
code from that directly survives, but without the prototype, the current
version of the AIO infrastructure would not exist.

Similarly, the reviewers below have not necessarily looked at the current
design or the whole infrastructure, but have provided very valuable input. I
am to blame for problems, not they.

Author: Andres Freund <andres@anarazel.de>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Co-authored-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Co-authored-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Noah Misch <noah@leadboat.com>
Reviewed-by: Jakub Wartak <jakub.wartak@enterprisedb.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Dmitry Dolgov <9erthalion6@gmail.com>
Reviewed-by: Antonin Houska <ah@cybertec.at>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
Andres Freund
2025-03-17 18:51:33 -04:00
parent 02844012b3
commit da7226993f
13 changed files with 2834 additions and 0 deletions

View File

@@ -10,7 +10,11 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
aio.o \
aio_callback.o \
aio_init.o \
aio_io.o \
aio_target.o \
method_sync.o \
read_stream.o
include $(top_srcdir)/src/backend/common.mk

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,308 @@
/*-------------------------------------------------------------------------
*
* aio_callback.c
* AIO - Functionality related to callbacks that can be registered on IO
* Handles
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio_callback.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
/* just to have something to put into aio_handle_cbs */
static const PgAioHandleCallbacks aio_invalid_cb = {0};
typedef struct PgAioHandleCallbacksEntry
{
const PgAioHandleCallbacks *const cb;
const char *const name;
} PgAioHandleCallbacksEntry;
/*
* Callback definition for the callbacks that can be registered on an IO
* handle. See PgAioHandleCallbackID's definition for an explanation for why
* callbacks are not identified by a pointer.
*/
static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
#undef CALLBACK_ENTRY
};
/* --------------------------------------------------------------------------------
* Public callback related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
/*
* Register callback for the IO handle.
*
* Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
* registered for each IO.
*
* Callbacks need to be registered before [indirectly] calling
* pgaio_io_prep_*(), as the IO may be executed immediately.
*
* A callback can be passed a small bit of data, e.g. to indicate whether to
* zero a buffer if it is invalid.
*
*
* Note that callbacks are executed in critical sections. This is necessary
* to be able to execute IO in critical sections (consider e.g. WAL
* logging). To perform AIO we first need to acquire a handle, which, if there
* are no free handles, requires waiting for IOs to complete and to execute
* their completion callbacks.
*
* Callbacks may be executed in the issuing backend but also in another
* backend (because that backend is waiting for the IO) or in IO workers (if
* io_method=worker is used).
*
*
* See PgAioHandleCallbackID's definition for an explanation for why
* callbacks are not identified by a pointer.
*/
void
pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
uint8 cb_data)
{
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
if (cb_id >= lengthof(aio_handle_cbs))
elog(ERROR, "callback %d is out of range", cb_id);
if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
aio_handle_cbs[cb_id].cb->complete_local == NULL)
elog(ERROR, "callback %d does not have a completion callback", cb_id);
if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
elog(PANIC, "too many callbacks, the max is %d",
PGAIO_HANDLE_MAX_CALLBACKS);
ioh->callbacks[ioh->num_callbacks] = cb_id;
ioh->callbacks_data[ioh->num_callbacks] = cb_data;
pgaio_debug_io(DEBUG3, ioh,
"adding cb #%d, id %d/%s",
ioh->num_callbacks + 1,
cb_id, ce->name);
ioh->num_callbacks++;
}
/*
* Associate an array of data with the Handle. This is e.g. useful to the
* transport knowledge about which buffers a multi-block IO affects to
* completion callbacks.
*
* Right now this can be done only once for each IO, even though multiple
* callbacks can be registered. There aren't any known usecases requiring more
* and the required amount of shared memory does add up, so it doesn't seem
* worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
*/
void
pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
{
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
Assert(ioh->handle_data_len == 0);
Assert(len <= PG_IOV_MAX);
for (int i = 0; i < len; i++)
pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
ioh->handle_data_len = len;
}
/*
* Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
* array to a 64bit array. Without it callers would end up needing to
* open-code equivalent code.
*/
void
pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
{
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
Assert(ioh->handle_data_len == 0);
Assert(len <= PG_IOV_MAX);
for (int i = 0; i < len; i++)
pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
ioh->handle_data_len = len;
}
/*
* Return data set with pgaio_io_set_handle_data_*().
*/
uint64 *
pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
{
Assert(ioh->handle_data_len > 0);
*len = ioh->handle_data_len;
return &pgaio_ctl->handle_data[ioh->iovec_off];
}
/* --------------------------------------------------------------------------------
* Public IO Result related functions
* --------------------------------------------------------------------------------
*/
void
pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
{
PgAioHandleCallbackID cb_id = result.id;
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
Assert(result.status != ARS_UNKNOWN);
Assert(result.status != ARS_OK);
if (ce->cb->report == NULL)
elog(ERROR, "callback %d/%s does not have report callback",
result.id, ce->name);
ce->cb->report(result, target_data, elevel);
}
/* --------------------------------------------------------------------------------
* Internal callback related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
/*
* Internal function which invokes ->stage for all the registered callbacks.
*/
void
pgaio_io_call_stage(PgAioHandle *ioh)
{
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
for (int i = ioh->num_callbacks; i > 0; i--)
{
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
uint8 cb_data = ioh->callbacks_data[i - 1];
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
if (!ce->cb->stage)
continue;
pgaio_debug_io(DEBUG3, ioh,
"calling cb #%d %d/%s->stage(%u)",
i, cb_id, ce->name, cb_data);
ce->cb->stage(ioh, cb_data);
}
}
/*
* Internal function which invokes ->complete_shared for all the registered
* callbacks.
*/
void
pgaio_io_call_complete_shared(PgAioHandle *ioh)
{
PgAioResult result;
START_CRIT_SECTION();
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
result.status = ARS_OK; /* low level IO is always considered OK */
result.result = ioh->result;
result.id = PGAIO_HCB_INVALID;
result.error_data = 0;
/*
* Call callbacks with the last registered (innermost) callback first.
* Each callback can modify the result forwarded to the next callback.
*/
for (int i = ioh->num_callbacks; i > 0; i--)
{
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
uint8 cb_data = ioh->callbacks_data[i - 1];
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
if (!ce->cb->complete_shared)
continue;
pgaio_debug_io(DEBUG4, ioh,
"calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
i, cb_id, ce->name,
cb_data,
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result);
result = ce->cb->complete_shared(ioh, result, cb_data);
}
ioh->distilled_result = result;
pgaio_debug_io(DEBUG3, ioh,
"after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result,
ioh->result);
END_CRIT_SECTION();
}
/*
* Internal function which invokes ->complete_local for all the registered
* callbacks.
*
* XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
*/
void
pgaio_io_call_complete_local(PgAioHandle *ioh)
{
PgAioResult result;
START_CRIT_SECTION();
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
/* start with distilled result from shared callback */
result = ioh->distilled_result;
for (int i = ioh->num_callbacks; i > 0; i--)
{
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
uint8 cb_data = ioh->callbacks_data[i - 1];
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
if (!ce->cb->complete_local)
continue;
pgaio_debug_io(DEBUG4, ioh,
"calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
i, cb_id, ce->name, cb_data,
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result);
result = ce->cb->complete_local(ioh, result, cb_data);
}
/*
* Note that we don't save the result in ioh->distilled_result, the local
* callback's result should not ever matter to other waiters.
*/
pgaio_debug_io(DEBUG3, ioh,
"after local completion: distilled result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result,
ioh->result);
END_CRIT_SECTION();
}

View File

@@ -14,24 +14,222 @@
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/guc.h"
static Size
AioCtlShmemSize(void)
{
Size sz;
/* pgaio_ctl itself */
sz = offsetof(PgAioCtl, io_handles);
return sz;
}
static uint32
AioProcs(void)
{
return MaxBackends + NUM_AUXILIARY_PROCS;
}
static Size
AioBackendShmemSize(void)
{
return mul_size(AioProcs(), sizeof(PgAioBackend));
}
static Size
AioHandleShmemSize(void)
{
Size sz;
/* verify AioChooseMaxConcurrency() did its thing */
Assert(io_max_concurrency > 0);
/* io handles */
sz = mul_size(AioProcs(),
mul_size(io_max_concurrency, sizeof(PgAioHandle)));
return sz;
}
static Size
AioHandleIOVShmemSize(void)
{
/*
* Each IO handle can have an PG_IOV_MAX long iovec.
*
* XXX: Right now the amount of space available for each IO is PG_IOV_MAX.
* While it's tempting to use the io_combine_limit GUC, that's
* PGC_USERSET, so we can't allocate shared memory based on that.
*/
return mul_size(sizeof(struct iovec),
mul_size(mul_size(PG_IOV_MAX, AioProcs()),
io_max_concurrency));
}
static Size
AioHandleDataShmemSize(void)
{
/* each buffer referenced by an iovec can have associated data */
return mul_size(sizeof(uint64),
mul_size(mul_size(PG_IOV_MAX, AioProcs()),
io_max_concurrency));
}
/*
* Choose a suitable value for io_max_concurrency.
*
* It's unlikely that we could have more IOs in flight than buffers that we
* would be allowed to pin.
*
* On the upper end, apply a cap too - just because shared_buffers is large,
* it doesn't make sense have millions of buffers undergo IO concurrently.
*/
static int
AioChooseMaxConcurrency(void)
{
uint32 max_backends;
int max_proportional_pins;
/* Similar logic to LimitAdditionalPins() */
max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
max_proportional_pins = NBuffers / max_backends;
max_proportional_pins = Max(max_proportional_pins, 1);
/* apply upper limit */
return Min(max_proportional_pins, 64);
}
Size
AioShmemSize(void)
{
Size sz = 0;
/*
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set io_max_concurrency = -1 in the
* config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
* we must force the matter with PGC_S_OVERRIDE.
*/
if (io_max_concurrency == -1)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", AioChooseMaxConcurrency());
SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
PGC_S_DYNAMIC_DEFAULT);
if (io_max_concurrency == -1) /* failed to apply it? */
SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
PGC_S_OVERRIDE);
}
sz = add_size(sz, AioCtlShmemSize());
sz = add_size(sz, AioBackendShmemSize());
sz = add_size(sz, AioHandleShmemSize());
sz = add_size(sz, AioHandleIOVShmemSize());
sz = add_size(sz, AioHandleDataShmemSize());
/* Reserve space for method specific resources. */
if (pgaio_method_ops->shmem_size)
sz = add_size(sz, pgaio_method_ops->shmem_size());
return sz;
}
void
AioShmemInit(void)
{
bool found;
uint32 io_handle_off = 0;
uint32 iovec_off = 0;
uint32 per_backend_iovecs = io_max_concurrency * PG_IOV_MAX;
pgaio_ctl = (PgAioCtl *)
ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
if (found)
goto out;
memset(pgaio_ctl, 0, AioCtlShmemSize());
pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
pgaio_ctl->backend_state = (PgAioBackend *)
ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
pgaio_ctl->io_handles = (PgAioHandle *)
ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
pgaio_ctl->iovecs = (struct iovec *)
ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
pgaio_ctl->handle_data = (uint64 *)
ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
for (int procno = 0; procno < AioProcs(); procno++)
{
PgAioBackend *bs = &pgaio_ctl->backend_state[procno];
bs->io_handle_off = io_handle_off;
io_handle_off += io_max_concurrency;
dclist_init(&bs->idle_ios);
memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE);
dclist_init(&bs->in_flight_ios);
/* initialize per-backend IOs */
for (int i = 0; i < io_max_concurrency; i++)
{
PgAioHandle *ioh = &pgaio_ctl->io_handles[bs->io_handle_off + i];
ioh->generation = 1;
ioh->owner_procno = procno;
ioh->iovec_off = iovec_off;
ioh->handle_data_len = 0;
ioh->report_return = NULL;
ioh->resowner = NULL;
ioh->num_callbacks = 0;
ioh->distilled_result.status = ARS_UNKNOWN;
ioh->flags = 0;
ConditionVariableInit(&ioh->cv);
dclist_push_tail(&bs->idle_ios, &ioh->node);
iovec_off += PG_IOV_MAX;
}
}
out:
/* Initialize IO method specific resources. */
if (pgaio_method_ops->shmem_init)
pgaio_method_ops->shmem_init(!found);
}
void
pgaio_init_backend(void)
{
/* shouldn't be initialized twice */
Assert(!pgaio_my_backend);
if (MyProc == NULL || MyProcNumber >= AioProcs())
elog(ERROR, "aio requires a normal PGPROC");
pgaio_my_backend = &pgaio_ctl->backend_state[MyProcNumber];
if (pgaio_method_ops->init_backend)
pgaio_method_ops->init_backend();
before_shmem_exit(pgaio_shutdown, 0);
}

View File

@@ -0,0 +1,184 @@
/*-------------------------------------------------------------------------
*
* aio_io.c
* AIO - Low Level IO Handling
*
* Functions related to associating IO operations to IO Handles and IO-method
* independent support functions for actually performing IO.
*
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio_io.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/fd.h"
#include "utils/wait_event.h"
static void pgaio_io_before_prep(PgAioHandle *ioh);
/* --------------------------------------------------------------------------------
* Public IO related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
/*
* Scatter/gather IO needs to associate an iovec with the Handle. To support
* worker mode this data needs to be in shared memory.
*/
int
pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov)
{
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
*iov = &pgaio_ctl->iovecs[ioh->iovec_off];
return PG_IOV_MAX;
}
PgAioOp
pgaio_io_get_op(PgAioHandle *ioh)
{
return ioh->op;
}
PgAioOpData *
pgaio_io_get_op_data(PgAioHandle *ioh)
{
return &ioh->op_data;
}
/* --------------------------------------------------------------------------------
* "Preparation" routines for individual IO operations
*
* These are called by the code actually initiating an IO, to associate the IO
* specific data with an AIO handle.
*
* Each of the preparation routines first needs to call
* pgaio_io_before_prep(), then fill IO specific fields in the handle and then
* finally call pgaio_io_stage().
* --------------------------------------------------------------------------------
*/
void
pgaio_io_prep_readv(PgAioHandle *ioh,
int fd, int iovcnt, uint64 offset)
{
pgaio_io_before_prep(ioh);
ioh->op_data.read.fd = fd;
ioh->op_data.read.offset = offset;
ioh->op_data.read.iov_length = iovcnt;
pgaio_io_stage(ioh, PGAIO_OP_READV);
}
void
pgaio_io_prep_writev(PgAioHandle *ioh,
int fd, int iovcnt, uint64 offset)
{
pgaio_io_before_prep(ioh);
ioh->op_data.write.fd = fd;
ioh->op_data.write.offset = offset;
ioh->op_data.write.iov_length = iovcnt;
pgaio_io_stage(ioh, PGAIO_OP_WRITEV);
}
/* --------------------------------------------------------------------------------
* Internal IO related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
/*
* Execute IO operation synchronously. This is implemented here, not in
* method_sync.c, because other IO methods also might use it / fall back to
* it.
*/
void
pgaio_io_perform_synchronously(PgAioHandle *ioh)
{
ssize_t result = 0;
struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
START_CRIT_SECTION();
/* Perform IO. */
switch (ioh->op)
{
case PGAIO_OP_READV:
pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_READ);
result = pg_preadv(ioh->op_data.read.fd, iov,
ioh->op_data.read.iov_length,
ioh->op_data.read.offset);
pgstat_report_wait_end();
break;
case PGAIO_OP_WRITEV:
pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_WRITE);
result = pg_pwritev(ioh->op_data.write.fd, iov,
ioh->op_data.write.iov_length,
ioh->op_data.write.offset);
pgstat_report_wait_end();
break;
case PGAIO_OP_INVALID:
elog(ERROR, "trying to execute invalid IO operation");
}
ioh->result = result < 0 ? -errno : result;
pgaio_io_process_completion(ioh, ioh->result);
END_CRIT_SECTION();
}
/*
* Helper function to be called by IO operation preparation functions, before
* any data in the handle is set. Mostly to centralize assertions.
*/
static void
pgaio_io_before_prep(PgAioHandle *ioh)
{
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
Assert(pgaio_my_backend->handed_out_io == ioh);
Assert(pgaio_io_has_target(ioh));
Assert(ioh->op == PGAIO_OP_INVALID);
}
/*
* Could be made part of the public interface, but it's not clear there's
* really a use case for that.
*/
const char *
pgaio_io_get_op_name(PgAioHandle *ioh)
{
Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
switch (ioh->op)
{
case PGAIO_OP_INVALID:
return "invalid";
case PGAIO_OP_READV:
return "read";
case PGAIO_OP_WRITEV:
return "write";
}
return NULL; /* silence compiler */
}

View File

@@ -0,0 +1,114 @@
/*-------------------------------------------------------------------------
*
* aio_target.c
* AIO - Functionality related to executing IO for different targets
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio_target.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
/*
* Registry for entities that can be the target of AIO.
*/
static const PgAioTargetInfo *pgaio_target_info[] = {
[PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
.name = "invalid",
},
};
/* --------------------------------------------------------------------------------
* Public target related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
bool
pgaio_io_has_target(PgAioHandle *ioh)
{
return ioh->target != PGAIO_TID_INVALID;
}
/*
* Return the name for the target associated with the IO. Mostly useful for
* debugging/logging.
*/
const char *
pgaio_io_get_target_name(PgAioHandle *ioh)
{
Assert(ioh->target >= 0 && ioh->target < PGAIO_TID_COUNT);
return pgaio_target_info[ioh->target]->name;
}
/*
* Assign a target to the IO.
*
* This has to be called exactly once before pgaio_io_prep_*() is called.
*/
void
pgaio_io_set_target(PgAioHandle *ioh, PgAioTargetID targetid)
{
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
Assert(ioh->target == PGAIO_TID_INVALID);
ioh->target = targetid;
}
PgAioTargetData *
pgaio_io_get_target_data(PgAioHandle *ioh)
{
return &ioh->target_data;
}
/*
* Return a stringified description of the IO's target.
*
* The string is localized and allocated in the current memory context.
*/
char *
pgaio_io_get_target_description(PgAioHandle *ioh)
{
return pgaio_target_info[ioh->target]->describe_identity(&ioh->target_data);
}
/* --------------------------------------------------------------------------------
* Internal target related functions operating on IO Handles
* --------------------------------------------------------------------------------
*/
/*
* Internal: Check if pgaio_io_reopen() is available for the IO.
*/
bool
pgaio_io_can_reopen(PgAioHandle *ioh)
{
return pgaio_target_info[ioh->target]->reopen != NULL;
}
/*
* Internal: Before executing an IO outside of the context of the process the
* IO has been prepared in, the file descriptor has to be reopened - any FD
* referenced in the IO itself, won't be valid in the separate process.
*/
void
pgaio_io_reopen(PgAioHandle *ioh)
{
Assert(ioh->target >= 0 && ioh->target < PGAIO_TID_COUNT);
Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
pgaio_target_info[ioh->target]->reopen(ioh);
}

View File

@@ -2,6 +2,10 @@
backend_sources += files(
'aio.c',
'aio_callback.c',
'aio_init.c',
'aio_io.c',
'aio_target.c',
'method_sync.c',
'read_stream.c',
)

View File

@@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* method_sync.c
* AIO - perform "AIO" by executing it synchronously
*
* This method is mainly to check if AIO use causes regressions. Other IO
* methods might also fall back to the synchronous method for functionality
* they cannot provide.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/method_sync.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
static bool pgaio_sync_needs_synchronous_execution(PgAioHandle *ioh);
static int pgaio_sync_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
const IoMethodOps pgaio_sync_ops = {
.needs_synchronous_execution = pgaio_sync_needs_synchronous_execution,
.submit = pgaio_sync_submit,
};
static bool
pgaio_sync_needs_synchronous_execution(PgAioHandle *ioh)
{
return true;
}
static int
pgaio_sync_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
elog(ERROR, "IO should have been executed synchronously");
return 0;
}

View File

@@ -191,6 +191,7 @@ ABI_compatibility:
Section: ClassName - WaitEventIO
AIO_IO_COMPLETION "Waiting for IO completion."
BASEBACKUP_READ "Waiting for base backup to read from a file."
BASEBACKUP_SYNC "Waiting for data written by a base backup to reach durable storage."
BASEBACKUP_WRITE "Waiting for base backup to write to a file."