diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile index eaeaeeee8e3..89f821ea7e1 100644 --- a/src/backend/storage/aio/Makefile +++ b/src/backend/storage/aio/Makefile @@ -10,7 +10,11 @@ include $(top_builddir)/src/Makefile.global OBJS = \ aio.o \ + aio_callback.o \ aio_init.o \ + aio_io.o \ + aio_target.o \ + method_sync.o \ read_stream.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 828a94efdc3..4d5439c73fd 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -3,6 +3,28 @@ * aio.c * AIO - Core Logic * + * For documentation about how AIO works on a higher level, including a + * schematic example, see README.md. + * + * + * AIO is a complicated subsystem. To keep things navigable, it is split + * across a number of files: + * + * - method_*.c - different ways of executing AIO (e.g. worker process) + * + * - aio_target.c - IO on different kinds of targets + * + * - aio_io.c - method-independent code for specific IO ops (e.g. readv) + * + * - aio_callback.c - callbacks at IO operation lifecycle events + * + * - aio_init.c - per-server and per-backend initialization + * + * - aio.c - all other topics + * + * - read_stream.c - helper for reading buffered relation data + * + * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -15,10 +37,28 @@ #include "postgres.h" #include "lib/ilist.h" +#include "miscadmin.h" +#include "port/atomics.h" #include "storage/aio.h" +#include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "utils/guc.h" #include "utils/guc_hooks.h" +#include "utils/resowner.h" +#include "utils/wait_event_types.h" + +#ifdef USE_INJECTION_POINTS +#include "utils/injection_point.h" +#endif + + +static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state); +static void pgaio_io_reclaim(PgAioHandle *ioh); +static void pgaio_io_resowner_register(PgAioHandle *ioh); +static void pgaio_io_wait_for_free(void); +static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation); +static const char *pgaio_io_state_get_name(PgAioHandleState s); +static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation); /* Options for io_method. */ @@ -31,7 +71,179 @@ const struct config_enum_entry io_method_options[] = { int io_method = DEFAULT_IO_METHOD; int io_max_concurrency = -1; +/* global control for AIO */ +PgAioCtl *pgaio_ctl; +/* current backend's per-backend state */ +PgAioBackend *pgaio_my_backend; + + +static const IoMethodOps *const pgaio_method_ops_table[] = { + [IOMETHOD_SYNC] = &pgaio_sync_ops, +}; + +/* callbacks for the configured io_method, set by assign_io_method */ +const IoMethodOps *pgaio_method_ops; + + +/* + * Currently there's no infrastructure to pass arguments to injection points, + * so we instead set this up for the duration of the injection point + * invocation. See pgaio_io_call_inj(). + */ +#ifdef USE_INJECTION_POINTS +static PgAioHandle *pgaio_inj_cur_handle; +#endif + + + +/* -------------------------------------------------------------------------------- + * Public Functions related to PgAioHandle + * -------------------------------------------------------------------------------- + */ + +/* + * Acquire an AioHandle, waiting for IO completion if necessary. + * + * Each backend can only have one AIO handle that has been "handed out" to + * code, but not yet submitted or released. This restriction is necessary to + * ensure that it is possible for code to wait for an unused handle by waiting + * for in-flight IO to complete. There is a limited number of handles in each + * backend, if multiple handles could be handed out without being submitted, + * waiting for all in-flight IO to complete would not guarantee that handles + * free up. + * + * It is cheap to acquire an IO handle, unless all handles are in use. In that + * case this function waits for the oldest IO to complete. If that is not + * desirable, use pgaio_io_acquire_nb(). + * + * If a handle was acquired but then does not turn out to be needed, + * e.g. because pgaio_io_acquire() is called before starting an IO in a + * critical section, the handle needs to be released with pgaio_io_release(). + * + * + * To react to the completion of the IO as soon as it is known to have + * completed, callbacks can be registered with pgaio_io_register_callbacks(). + * + * To actually execute IO using the returned handle, the pgaio_io_prep_*() + * family of functions is used. In many cases the pgaio_io_prep_*() call will + * not be done directly by code that acquired the handle, but by lower level + * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform + * AIO, it typically will pass the handle to smgr.c, which will pass it on to + * md.c, on to fd.c, which then finally calls pgaio_io_prep_*(). This + * forwarding allows the various layers to react to the IO's completion by + * registering callbacks. These callbacks in turn can translate a lower + * layer's result into a result understandable by a higher layer. + * + * During pgaio_io_prep_*() the IO is staged (i.e. prepared for execution but + * not submitted to the kernel). Unless in batchmode + * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for + * execution. Note that, whether in batchmode or not, the IO might even + * complete before the functions return. + * + * After pgaio_io_prep_*() the AioHandle is "consumed" and may not be + * referenced by the IO issuing code. To e.g. wait for IO, references to the + * IO can be established with pgaio_io_get_wref() *before* pgaio_io_prep_*() + * is called. pgaio_wref_wait() can be used to wait for the IO to complete. + * + * + * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be + * passed to pgaio_io_acquire(). Once the issuing backend has called + * pgaio_wref_wait(), the PgAioReturn contains information about whether the + * operation succeeded and details about the first failure, if any. The error + * can be raised / logged with pgaio_result_report(). + * + * The lifetime of the memory pointed to be *ret needs to be at least as long + * as the passed in resowner. If the resowner releases resources before the IO + * completes (typically due to an error), the reference to *ret will be + * cleared. In case of resowner cleanup *ret will not be updated with the + * results of the IO operation. + */ +PgAioHandle * +pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret) +{ + PgAioHandle *h; + + while (true) + { + h = pgaio_io_acquire_nb(resowner, ret); + + if (h != NULL) + return h; + + /* + * Evidently all handles by this backend are in use. Just wait for + * some to complete. + */ + pgaio_io_wait_for_free(); + } +} + +/* + * Acquire an AioHandle, returning NULL if no handles are free. + * + * See pgaio_io_acquire(). The only difference is that this function will return + * NULL if there are no idle handles, instead of blocking. + */ +PgAioHandle * +pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret) +{ + if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE) + { + Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE); + pgaio_submit_staged(); + } + + if (pgaio_my_backend->handed_out_io) + elog(ERROR, "API violation: Only one IO can be handed out"); + + if (!dclist_is_empty(&pgaio_my_backend->idle_ios)) + { + dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios); + PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion); + + Assert(ioh->state == PGAIO_HS_IDLE); + Assert(ioh->owner_procno == MyProcNumber); + + pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT); + pgaio_my_backend->handed_out_io = ioh; + + if (resowner) + pgaio_io_resowner_register(ioh); + + if (ret) + { + ioh->report_return = ret; + ret->result.status = ARS_UNKNOWN; + } + + return ioh; + } + + return NULL; +} + +/* + * Release IO handle that turned out to not be required. + * + * See pgaio_io_acquire() for more details. + */ +void +pgaio_io_release(PgAioHandle *ioh) +{ + if (ioh == pgaio_my_backend->handed_out_io) + { + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->resowner); + + pgaio_my_backend->handed_out_io = NULL; + pgaio_io_reclaim(ioh); + } + else + { + elog(ERROR, "release in unexpected state"); + } +} /* * Release IO handle during resource owner cleanup. @@ -39,8 +251,795 @@ int io_max_concurrency = -1; void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) { + PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node); + + Assert(ioh->resowner); + + ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); + ioh->resowner = NULL; + + switch (ioh->state) + { + case PGAIO_HS_IDLE: + elog(ERROR, "unexpected"); + break; + case PGAIO_HS_HANDED_OUT: + Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL); + + if (ioh == pgaio_my_backend->handed_out_io) + { + pgaio_my_backend->handed_out_io = NULL; + if (!on_error) + elog(WARNING, "leaked AIO handle"); + } + + pgaio_io_reclaim(ioh); + break; + case PGAIO_HS_DEFINED: + case PGAIO_HS_STAGED: + if (!on_error) + elog(WARNING, "AIO handle was not submitted"); + pgaio_submit_staged(); + break; + case PGAIO_HS_SUBMITTED: + case PGAIO_HS_COMPLETED_IO: + case PGAIO_HS_COMPLETED_SHARED: + case PGAIO_HS_COMPLETED_LOCAL: + /* this is expected to happen */ + break; + } + + /* + * Need to unregister the reporting of the IO's result, the memory it's + * referencing likely has gone away. + */ + if (ioh->report_return) + ioh->report_return = NULL; } +/* + * Add a [set of] flags to the IO. + * + * Note that this combines flags with already set flags, rather than set flags + * to explicitly the passed in parameters. This is to allow multiple callsites + * to set flags. + */ +void +pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + + ioh->flags |= flag; +} + +/* + * Returns an ID uniquely identifying the IO handle. This is only really + * useful for logging, as handles are reused across multiple IOs. + */ +int +pgaio_io_get_id(PgAioHandle *ioh) +{ + Assert(ioh >= pgaio_ctl->io_handles && + ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count)); + return ioh - pgaio_ctl->io_handles; +} + +/* + * Return the ProcNumber for the process that can use an IO handle. The + * mapping from IO handles to PGPROCs is static, therefore this even works + * when the corresponding PGPROC is not in use. + */ +ProcNumber +pgaio_io_get_owner(PgAioHandle *ioh) +{ + return ioh->owner_procno; +} + +/* + * Return a wait reference for the IO. Only wait references can be used to + * wait for an IOs completion, as handles themselves can be reused after + * completion. See also the comment above pgaio_io_acquire(). + */ +void +pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT || + ioh->state == PGAIO_HS_DEFINED || + ioh->state == PGAIO_HS_STAGED); + Assert(ioh->generation != 0); + + iow->aio_index = ioh - pgaio_ctl->io_handles; + iow->generation_upper = (uint32) (ioh->generation >> 32); + iow->generation_lower = (uint32) ioh->generation; +} + + + +/* -------------------------------------------------------------------------------- + * Internal Functions related to PgAioHandle + * -------------------------------------------------------------------------------- + */ + +static inline void +pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state) +{ + pgaio_debug_io(DEBUG5, ioh, + "updating state to %s", + pgaio_io_state_get_name(new_state)); + + /* + * Ensure the changes signified by the new state are visible before the + * new state becomes visible. + */ + pg_write_barrier(); + + ioh->state = new_state; +} + +static void +pgaio_io_resowner_register(PgAioHandle *ioh) +{ + Assert(!ioh->resowner); + Assert(CurrentResourceOwner); + + ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node); + ioh->resowner = CurrentResourceOwner; +} + +/* + * Stage IO for execution and, if appropriate, submit it immediately. + * + * Should only be called from pgaio_io_prep_*(). + */ +void +pgaio_io_stage(PgAioHandle *ioh, PgAioOp op) +{ + bool needs_synchronous; + + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(pgaio_my_backend->handed_out_io == ioh); + Assert(pgaio_io_has_target(ioh)); + + ioh->op = op; + ioh->result = 0; + + pgaio_io_update_state(ioh, PGAIO_HS_DEFINED); + + /* allow a new IO to be staged */ + pgaio_my_backend->handed_out_io = NULL; + + pgaio_io_call_stage(ioh); + + pgaio_io_update_state(ioh, PGAIO_HS_STAGED); + + /* + * Synchronous execution has to be executed, well, synchronously, so check + * that first. + */ + needs_synchronous = pgaio_io_needs_synchronous_execution(ioh); + + pgaio_debug_io(DEBUG3, ioh, + "prepared (synchronous: %d, in_batch: %d)", + needs_synchronous, pgaio_my_backend->in_batchmode); + + if (!needs_synchronous) + { + pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh; + Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); + + /* + * Unless code explicitly opted into batching IOs, submit the IO + * immediately. + */ + if (!pgaio_my_backend->in_batchmode) + pgaio_submit_staged(); + } + else + { + pgaio_io_prepare_submit(ioh); + pgaio_io_perform_synchronously(ioh); + } +} + +bool +pgaio_io_needs_synchronous_execution(PgAioHandle *ioh) +{ + /* + * If the caller said to execute the IO synchronously, do so. + * + * XXX: We could optimize the logic when to execute synchronously by first + * checking if there are other IOs in flight and only synchronously + * executing if not. Unclear whether that'll be sufficiently common to be + * worth worrying about. + */ + if (ioh->flags & PGAIO_HF_SYNCHRONOUS) + return true; + + /* Check if the IO method requires synchronous execution of IO */ + if (pgaio_method_ops->needs_synchronous_execution) + return pgaio_method_ops->needs_synchronous_execution(ioh); + + return false; +} + +/* + * Handle IO being processed by IO method. + * + * Should be called by IO methods / synchronous IO execution, just before the + * IO is performed. + */ +void +pgaio_io_prepare_submit(PgAioHandle *ioh) +{ + pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED); + + dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node); +} + +/* + * Handle IO getting completed by a method. + * + * Should be called by IO methods / synchronous IO execution, just after the + * IO has been performed. + * + * Expects to be called in a critical section. We expect IOs to be usable for + * WAL etc, which requires being able to execute completion callbacks in a + * critical section. + */ +void +pgaio_io_process_completion(PgAioHandle *ioh, int result) +{ + Assert(ioh->state == PGAIO_HS_SUBMITTED); + + Assert(CritSectionCount > 0); + + ioh->result = result; + + pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO); + + pgaio_io_call_inj(ioh, "AIO_PROCESS_COMPLETION_BEFORE_SHARED"); + + pgaio_io_call_complete_shared(ioh); + + pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED); + + /* condition variable broadcast ensures state is visible before wakeup */ + ConditionVariableBroadcast(&ioh->cv); + + /* contains call to pgaio_io_call_complete_local() */ + if (ioh->owner_procno == MyProcNumber) + pgaio_io_reclaim(ioh); +} + +/* + * Has the IO completed and thus the IO handle been reused? + * + * This is useful when waiting for IO completion at a low level (e.g. in an IO + * method's ->wait_one() callback). + */ +bool +pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state) +{ + *state = ioh->state; + pg_read_barrier(); + + return ioh->generation != ref_generation; +} + +/* + * Wait for IO to complete. External code should never use this, outside of + * the AIO subsystem waits are only allowed via pgaio_wref_wait(). + */ +static void +pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation) +{ + PgAioHandleState state; + bool am_owner; + + am_owner = ioh->owner_procno == MyProcNumber; + + if (pgaio_io_was_recycled(ioh, ref_generation, &state)) + return; + + if (am_owner) + { + if (state != PGAIO_HS_SUBMITTED + && state != PGAIO_HS_COMPLETED_IO + && state != PGAIO_HS_COMPLETED_SHARED + && state != PGAIO_HS_COMPLETED_LOCAL) + { + elog(PANIC, "waiting for own IO in wrong state: %d", + state); + } + } + + while (true) + { + if (pgaio_io_was_recycled(ioh, ref_generation, &state)) + return; + + switch (state) + { + case PGAIO_HS_IDLE: + case PGAIO_HS_HANDED_OUT: + elog(ERROR, "IO in wrong state: %d", state); + break; + + case PGAIO_HS_SUBMITTED: + + /* + * If we need to wait via the IO method, do so now. Don't + * check via the IO method if the issuing backend is executing + * the IO synchronously. + */ + if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS)) + { + pgaio_method_ops->wait_one(ioh, ref_generation); + continue; + } + /* fallthrough */ + + /* waiting for owner to submit */ + case PGAIO_HS_DEFINED: + case PGAIO_HS_STAGED: + /* waiting for reaper to complete */ + /* fallthrough */ + case PGAIO_HS_COMPLETED_IO: + /* shouldn't be able to hit this otherwise */ + Assert(IsUnderPostmaster); + /* ensure we're going to get woken up */ + ConditionVariablePrepareToSleep(&ioh->cv); + + while (!pgaio_io_was_recycled(ioh, ref_generation, &state)) + { + if (state == PGAIO_HS_COMPLETED_SHARED || + state == PGAIO_HS_COMPLETED_LOCAL) + break; + ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION); + } + + ConditionVariableCancelSleep(); + break; + + case PGAIO_HS_COMPLETED_SHARED: + case PGAIO_HS_COMPLETED_LOCAL: + /* see above */ + if (am_owner) + pgaio_io_reclaim(ioh); + return; + } + } +} + +/* + * Make IO handle ready to be reused after IO has completed or after the + * handle has been released without being used. + */ +static void +pgaio_io_reclaim(PgAioHandle *ioh) +{ + /* This is only ok if it's our IO */ + Assert(ioh->owner_procno == MyProcNumber); + Assert(ioh->state != PGAIO_HS_IDLE); + + /* + * It's a bit ugly, but right now the easiest place to put the execution + * of shared completion callbacks is this function, as we need to execute + * local callbacks just before reclaiming at multiple callsites. + */ + if (ioh->state == PGAIO_HS_COMPLETED_SHARED) + { + pgaio_io_call_complete_local(ioh); + pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL); + } + + pgaio_debug_io(DEBUG4, ioh, + "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d", + pgaio_result_status_string(ioh->distilled_result.status), + ioh->distilled_result.id, + ioh->distilled_result.error_data, + ioh->result); + + /* if the IO has been defined, we might need to do more work */ + if (ioh->state != PGAIO_HS_HANDED_OUT) + { + dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node); + + if (ioh->report_return) + { + ioh->report_return->result = ioh->distilled_result; + ioh->report_return->target_data = ioh->target_data; + } + } + + if (ioh->resowner) + { + ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); + ioh->resowner = NULL; + } + + Assert(!ioh->resowner); + + ioh->op = PGAIO_OP_INVALID; + ioh->target = PGAIO_TID_INVALID; + ioh->flags = 0; + ioh->num_callbacks = 0; + ioh->handle_data_len = 0; + ioh->report_return = NULL; + ioh->result = 0; + ioh->distilled_result.status = ARS_UNKNOWN; + + /* XXX: the barrier is probably superfluous */ + pg_write_barrier(); + ioh->generation++; + + pgaio_io_update_state(ioh, PGAIO_HS_IDLE); + + /* + * We push the IO to the head of the idle IO list, that seems more cache + * efficient in cases where only a few IOs are used. + */ + dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node); +} + +/* + * Wait for an IO handle to become usable. + * + * This only really is useful for pgaio_io_acquire(). + */ +static void +pgaio_io_wait_for_free(void) +{ + int reclaimed = 0; + + pgaio_debug(DEBUG2, "waiting for self with %d pending", + pgaio_my_backend->num_staged_ios); + + /* + * First check if any of our IOs actually have completed - when using + * worker, that'll often be the case. We could do so as part of the loop + * below, but that'd potentially lead us to wait for some IO submitted + * before. + */ + for (int i = 0; i < io_max_concurrency; i++) + { + PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i]; + + if (ioh->state == PGAIO_HS_COMPLETED_SHARED) + { + pgaio_io_reclaim(ioh); + reclaimed++; + } + } + + if (reclaimed > 0) + return; + + /* + * If we have any unsubmitted IOs, submit them now. We'll start waiting in + * a second, so it's better they're in flight. This also addresses the + * edge-case that all IOs are unsubmitted. + */ + if (pgaio_my_backend->num_staged_ios > 0) + pgaio_submit_staged(); + + if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0) + elog(ERROR, "no free IOs despite no in-flight IOs"); + + /* + * Wait for the oldest in-flight IO to complete. + * + * XXX: Reusing the general IO wait is suboptimal, we don't need to wait + * for that specific IO to complete, we just need *any* IO to complete. + */ + { + PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, + &pgaio_my_backend->in_flight_ios); + + switch (ioh->state) + { + /* should not be in in-flight list */ + case PGAIO_HS_IDLE: + case PGAIO_HS_DEFINED: + case PGAIO_HS_HANDED_OUT: + case PGAIO_HS_STAGED: + case PGAIO_HS_COMPLETED_LOCAL: + elog(ERROR, "shouldn't get here with io:%d in state %d", + pgaio_io_get_id(ioh), ioh->state); + break; + + case PGAIO_HS_COMPLETED_IO: + case PGAIO_HS_SUBMITTED: + pgaio_debug_io(DEBUG2, ioh, + "waiting for free io with %d in flight", + dclist_count(&pgaio_my_backend->in_flight_ios)); + + /* + * In a more general case this would be racy, because the + * generation could increase after we read ioh->state above. + * But we are only looking at IOs by the current backend and + * the IO can only be recycled by this backend. + */ + pgaio_io_wait(ioh, ioh->generation); + break; + + case PGAIO_HS_COMPLETED_SHARED: + /* it's possible that another backend just finished this IO */ + pgaio_io_reclaim(ioh); + break; + } + + if (dclist_count(&pgaio_my_backend->idle_ios) == 0) + elog(PANIC, "no idle IO after waiting for IO to terminate"); + return; + } +} + +/* + * Internal - code outside of AIO should never need this and it'd be hard for + * such code to be safe. + */ +static PgAioHandle * +pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation) +{ + PgAioHandle *ioh; + + Assert(iow->aio_index < pgaio_ctl->io_handle_count); + + ioh = &pgaio_ctl->io_handles[iow->aio_index]; + + *ref_generation = ((uint64) iow->generation_upper) << 32 | + iow->generation_lower; + + Assert(*ref_generation != 0); + + return ioh; +} + +static const char * +pgaio_io_state_get_name(PgAioHandleState s) +{ +#define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym + switch (s) + { + PGAIO_HS_TOSTR_CASE(IDLE); + PGAIO_HS_TOSTR_CASE(HANDED_OUT); + PGAIO_HS_TOSTR_CASE(DEFINED); + PGAIO_HS_TOSTR_CASE(STAGED); + PGAIO_HS_TOSTR_CASE(SUBMITTED); + PGAIO_HS_TOSTR_CASE(COMPLETED_IO); + PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED); + PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL); + } +#undef PGAIO_HS_TOSTR_CASE + + return NULL; /* silence compiler */ +} + +const char * +pgaio_io_get_state_name(PgAioHandle *ioh) +{ + return pgaio_io_state_get_name(ioh->state); +} + +const char * +pgaio_result_status_string(PgAioResultStatus rs) +{ + switch (rs) + { + case ARS_UNKNOWN: + return "UNKNOWN"; + case ARS_OK: + return "OK"; + case ARS_PARTIAL: + return "PARTIAL"; + case ARS_ERROR: + return "ERROR"; + } + + return NULL; /* silence compiler */ +} + + + +/* -------------------------------------------------------------------------------- + * Functions primarily related to IO Wait References + * -------------------------------------------------------------------------------- + */ + +/* + * Mark a wait reference as invalid + */ +void +pgaio_wref_clear(PgAioWaitRef *iow) +{ + iow->aio_index = PG_UINT32_MAX; +} + +/* Is the wait reference valid? */ +bool +pgaio_wref_valid(PgAioWaitRef *iow) +{ + return iow->aio_index != PG_UINT32_MAX; +} + +/* + * Similar to pgaio_io_get_id(), just for wait references. + */ +int +pgaio_wref_get_id(PgAioWaitRef *iow) +{ + Assert(pgaio_wref_valid(iow)); + return iow->aio_index; +} + +/* + * Wait for the IO to have completed. Can be called in any process, not just + * in the issuing backend. + */ +void +pgaio_wref_wait(PgAioWaitRef *iow) +{ + uint64 ref_generation; + PgAioHandle *ioh; + + ioh = pgaio_io_from_wref(iow, &ref_generation); + + pgaio_io_wait(ioh, ref_generation); +} + +/* + * Check if the referenced IO completed, without blocking. + */ +bool +pgaio_wref_check_done(PgAioWaitRef *iow) +{ + uint64 ref_generation; + PgAioHandleState state; + bool am_owner; + PgAioHandle *ioh; + + ioh = pgaio_io_from_wref(iow, &ref_generation); + + if (pgaio_io_was_recycled(ioh, ref_generation, &state)) + return true; + + if (state == PGAIO_HS_IDLE) + return true; + + am_owner = ioh->owner_procno == MyProcNumber; + + if (state == PGAIO_HS_COMPLETED_SHARED || + state == PGAIO_HS_COMPLETED_LOCAL) + { + if (am_owner) + pgaio_io_reclaim(ioh); + return true; + } + + /* + * XXX: It likely would be worth checking in with the io method, to give + * the IO method a chance to check if there are completion events queued. + */ + + return false; +} + + + +/* -------------------------------------------------------------------------------- + * Actions on multiple IOs. + * -------------------------------------------------------------------------------- + */ + +/* + * Submit IOs in batches going forward. + * + * Submitting multiple IOs at once can be substantially faster than doing so + * one-by-one. At the same time, submitting multiple IOs at once requires more + * care to avoid deadlocks. + * + * Consider backend A staging an IO for buffer 1 and then trying to start IO + * on buffer 2, while backend B does the inverse. If A submitted the IO before + * moving on to buffer 2, this works just fine, B will wait for the IO to + * complete. But if batching were used, each backend will wait for IO that has + * not yet been submitted to complete, i.e. forever. + * + * End batch submission mode with pgaio_exit_batchmode(). (Throwing errors is + * allowed; error recovery will end the batch.) + * + * To avoid deadlocks, code needs to ensure that it will not wait for another + * backend while there is unsubmitted IO. E.g. by using conditional lock + * acquisition when acquiring buffer locks. To check if there currently are + * staged IOs, call pgaio_have_staged() and to submit all staged IOs call + * pgaio_submit_staged(). + * + * It is not allowed to enter batchmode while already in batchmode, it's + * unlikely to ever be needed, as code needs to be explicitly aware of being + * called in batchmode, to avoid the deadlock risks explained above. + * + * Note that IOs may get submitted before pgaio_exit_batchmode() is called, + * e.g. because too many IOs have been staged or because pgaio_submit_staged() + * was called. + */ +void +pgaio_enter_batchmode(void) +{ + if (pgaio_my_backend->in_batchmode) + elog(ERROR, "starting batch while batch already in progress"); + pgaio_my_backend->in_batchmode = true; +} + +/* + * Stop submitting IOs in batches. + */ +void +pgaio_exit_batchmode(void) +{ + Assert(pgaio_my_backend->in_batchmode); + + pgaio_submit_staged(); + pgaio_my_backend->in_batchmode = false; +} + +/* + * Are there staged but unsubmitted IOs? + * + * See comment above pgaio_enter_batchmode() for why code may need to check if + * there is IO in that state. + */ +bool +pgaio_have_staged(void) +{ + Assert(pgaio_my_backend->in_batchmode || + pgaio_my_backend->num_staged_ios == 0); + return pgaio_my_backend->num_staged_ios > 0; +} + +/* + * Submit all staged but not yet submitted IOs. + * + * Unless in batch mode, this never needs to be called, as IOs get submitted + * as soon as possible. While in batchmode pgaio_submit_staged() can be called + * before waiting on another backend, to avoid the risk of deadlocks. See + * pgaio_enter_batchmode(). + */ +void +pgaio_submit_staged(void) +{ + int total_submitted = 0; + int did_submit; + + if (pgaio_my_backend->num_staged_ios == 0) + return; + + + START_CRIT_SECTION(); + + did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios, + pgaio_my_backend->staged_ios); + + END_CRIT_SECTION(); + + total_submitted += did_submit; + + Assert(total_submitted == did_submit); + + pgaio_my_backend->num_staged_ios = 0; + + pgaio_debug(DEBUG4, + "aio: submitted %d IOs", + total_submitted); +} + + + +/* -------------------------------------------------------------------------------- + * Other + * -------------------------------------------------------------------------------- + */ + + /* * Perform AIO related cleanup after an error. * @@ -50,6 +1049,22 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) void pgaio_error_cleanup(void) { + /* + * It is possible that code errored out after pgaio_enter_batchmode() but + * before pgaio_exit_batchmode() was called. In that case we need to + * submit the IO now. + */ + if (pgaio_my_backend->in_batchmode) + { + pgaio_my_backend->in_batchmode = false; + + pgaio_submit_staged(); + } + + /* + * As we aren't in batchmode, there shouldn't be any unsubmitted IOs. + */ + Assert(pgaio_my_backend->num_staged_ios == 0); } /* @@ -62,11 +1077,86 @@ pgaio_error_cleanup(void) void AtEOXact_Aio(bool is_commit) { + /* + * We should never be in batch mode at transactional boundaries. In case + * an error was thrown while in batch mode, pgaio_error_cleanup() should + * have exited batchmode. + * + * In case we are in batchmode somehow, make sure to submit all staged + * IOs, other backends may need them to complete to continue. + */ + if (pgaio_my_backend->in_batchmode) + { + pgaio_error_cleanup(); + elog(WARNING, "open AIO batch at end of (sub-)transaction"); + } + + /* + * As we aren't in batchmode, there shouldn't be any unsubmitted IOs. + */ + Assert(pgaio_my_backend->num_staged_ios == 0); +} + +/* + * Need to submit staged but not yet submitted IOs using the fd, otherwise + * the IO would end up targeting something bogus. + */ +void +pgaio_closing_fd(int fd) +{ + /* + * Might be called before AIO is initialized or in a subprocess that + * doesn't use AIO. + */ + if (!pgaio_my_backend) + return; + + /* + * For now just submit all staged IOs - we could be more selective, but + * it's probably not worth it. + */ + pgaio_submit_staged(); +} + +/* + * Registered as before_shmem_exit() callback in pgaio_init_backend() + */ +void +pgaio_shutdown(int code, Datum arg) +{ + Assert(pgaio_my_backend); + Assert(!pgaio_my_backend->handed_out_io); + + /* first clean up resources as we would at a transaction boundary */ + AtEOXact_Aio(code == 0); + + /* + * Before exiting, make sure that all IOs are finished. That has two main + * purposes: + * + * - Some kernel-level AIO mechanisms don't deal well with the issuer of + * an AIO exiting before IO completed + * + * - It'd be confusing to see partially finished IOs in stats views etc + */ + while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios)) + { + PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios); + + /* see comment in pgaio_io_wait_for_free() about raciness */ + pgaio_io_wait(ioh, ioh->generation); + } + + pgaio_my_backend = NULL; } void assign_io_method(int newval, void *extra) { + Assert(pgaio_method_ops_table[newval] != NULL); + Assert(newval < lengthof(io_method_options)); + + pgaio_method_ops = pgaio_method_ops_table[newval]; } bool @@ -88,3 +1178,43 @@ check_io_max_concurrency(int *newval, void **extra, GucSource source) return true; } + + + +/* -------------------------------------------------------------------------------- + * Injection point support + * -------------------------------------------------------------------------------- + */ + +#ifdef USE_INJECTION_POINTS + +/* + * Call injection point with support for pgaio_inj_io_get(). + */ +void +pgaio_io_call_inj(PgAioHandle *ioh, const char *injection_point) +{ + pgaio_inj_cur_handle = ioh; + + PG_TRY(); + { + InjectionPointCached(injection_point); + } + PG_FINALLY(); + { + pgaio_inj_cur_handle = NULL; + } + PG_END_TRY(); +} + +/* + * Return IO associated with injection point invocation. This is only needed + * as injection points currently don't support arguments. + */ +PgAioHandle * +pgaio_inj_io_get(void) +{ + return pgaio_inj_cur_handle; +} + +#endif diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c new file mode 100644 index 00000000000..d5a2cca28f1 --- /dev/null +++ b/src/backend/storage/aio/aio_callback.c @@ -0,0 +1,308 @@ +/*------------------------------------------------------------------------- + * + * aio_callback.c + * AIO - Functionality related to callbacks that can be registered on IO + * Handles + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/aio_callback.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/aio.h" +#include "storage/aio_internal.h" + + +/* just to have something to put into aio_handle_cbs */ +static const PgAioHandleCallbacks aio_invalid_cb = {0}; + +typedef struct PgAioHandleCallbacksEntry +{ + const PgAioHandleCallbacks *const cb; + const char *const name; +} PgAioHandleCallbacksEntry; + +/* + * Callback definition for the callbacks that can be registered on an IO + * handle. See PgAioHandleCallbackID's definition for an explanation for why + * callbacks are not identified by a pointer. + */ +static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { +#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback} + CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), +#undef CALLBACK_ENTRY +}; + + + +/* -------------------------------------------------------------------------------- + * Public callback related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Register callback for the IO handle. + * + * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be + * registered for each IO. + * + * Callbacks need to be registered before [indirectly] calling + * pgaio_io_prep_*(), as the IO may be executed immediately. + * + * A callback can be passed a small bit of data, e.g. to indicate whether to + * zero a buffer if it is invalid. + * + * + * Note that callbacks are executed in critical sections. This is necessary + * to be able to execute IO in critical sections (consider e.g. WAL + * logging). To perform AIO we first need to acquire a handle, which, if there + * are no free handles, requires waiting for IOs to complete and to execute + * their completion callbacks. + * + * Callbacks may be executed in the issuing backend but also in another + * backend (because that backend is waiting for the IO) or in IO workers (if + * io_method=worker is used). + * + * + * See PgAioHandleCallbackID's definition for an explanation for why + * callbacks are not identified by a pointer. + */ +void +pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id, + uint8 cb_data) +{ + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (cb_id >= lengthof(aio_handle_cbs)) + elog(ERROR, "callback %d is out of range", cb_id); + if (aio_handle_cbs[cb_id].cb->complete_shared == NULL && + aio_handle_cbs[cb_id].cb->complete_local == NULL) + elog(ERROR, "callback %d does not have a completion callback", cb_id); + if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS) + elog(PANIC, "too many callbacks, the max is %d", + PGAIO_HANDLE_MAX_CALLBACKS); + ioh->callbacks[ioh->num_callbacks] = cb_id; + ioh->callbacks_data[ioh->num_callbacks] = cb_data; + + pgaio_debug_io(DEBUG3, ioh, + "adding cb #%d, id %d/%s", + ioh->num_callbacks + 1, + cb_id, ce->name); + + ioh->num_callbacks++; +} + +/* + * Associate an array of data with the Handle. This is e.g. useful to the + * transport knowledge about which buffers a multi-block IO affects to + * completion callbacks. + * + * Right now this can be done only once for each IO, even though multiple + * callbacks can be registered. There aren't any known usecases requiring more + * and the required amount of shared memory does add up, so it doesn't seem + * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS. + */ +void +pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->handle_data_len == 0); + Assert(len <= PG_IOV_MAX); + + for (int i = 0; i < len; i++) + pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i]; + ioh->handle_data_len = len; +} + +/* + * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit + * array to a 64bit array. Without it callers would end up needing to + * open-code equivalent code. + */ +void +pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->handle_data_len == 0); + Assert(len <= PG_IOV_MAX); + + for (int i = 0; i < len; i++) + pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i]; + ioh->handle_data_len = len; +} + +/* + * Return data set with pgaio_io_set_handle_data_*(). + */ +uint64 * +pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len) +{ + Assert(ioh->handle_data_len > 0); + + *len = ioh->handle_data_len; + + return &pgaio_ctl->handle_data[ioh->iovec_off]; +} + + + +/* -------------------------------------------------------------------------------- + * Public IO Result related functions + * -------------------------------------------------------------------------------- + */ + +void +pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel) +{ + PgAioHandleCallbackID cb_id = result.id; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + Assert(result.status != ARS_UNKNOWN); + Assert(result.status != ARS_OK); + + if (ce->cb->report == NULL) + elog(ERROR, "callback %d/%s does not have report callback", + result.id, ce->name); + + ce->cb->report(result, target_data, elevel); +} + + + +/* -------------------------------------------------------------------------------- + * Internal callback related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Internal function which invokes ->stage for all the registered callbacks. + */ +void +pgaio_io_call_stage(PgAioHandle *ioh) +{ + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->stage) + continue; + + pgaio_debug_io(DEBUG3, ioh, + "calling cb #%d %d/%s->stage(%u)", + i, cb_id, ce->name, cb_data); + ce->cb->stage(ioh, cb_data); + } +} + +/* + * Internal function which invokes ->complete_shared for all the registered + * callbacks. + */ +void +pgaio_io_call_complete_shared(PgAioHandle *ioh) +{ + PgAioResult result; + + START_CRIT_SECTION(); + + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + result.status = ARS_OK; /* low level IO is always considered OK */ + result.result = ioh->result; + result.id = PGAIO_HCB_INVALID; + result.error_data = 0; + + /* + * Call callbacks with the last registered (innermost) callback first. + * Each callback can modify the result forwarded to the next callback. + */ + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->complete_shared) + continue; + + pgaio_debug_io(DEBUG4, ioh, + "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)", + i, cb_id, ce->name, + cb_data, + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result); + result = ce->cb->complete_shared(ioh, result, cb_data); + } + + ioh->distilled_result = result; + + pgaio_debug_io(DEBUG3, ioh, + "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d", + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result, + ioh->result); + + END_CRIT_SECTION(); +} + +/* + * Internal function which invokes ->complete_local for all the registered + * callbacks. + * + * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared(). + */ +void +pgaio_io_call_complete_local(PgAioHandle *ioh) +{ + PgAioResult result; + + START_CRIT_SECTION(); + + Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT); + + /* start with distilled result from shared callback */ + result = ioh->distilled_result; + + for (int i = ioh->num_callbacks; i > 0; i--) + { + PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1]; + uint8 cb_data = ioh->callbacks_data[i - 1]; + const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id]; + + if (!ce->cb->complete_local) + continue; + + pgaio_debug_io(DEBUG4, ioh, + "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d", + i, cb_id, ce->name, cb_data, + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result); + result = ce->cb->complete_local(ioh, result, cb_data); + } + + /* + * Note that we don't save the result in ioh->distilled_result, the local + * callback's result should not ever matter to other waiters. + */ + pgaio_debug_io(DEBUG3, ioh, + "after local completion: distilled result: (status %s, id %u, error_data %d, result %d), raw_result: %d", + pgaio_result_status_string(result.status), + result.id, result.error_data, result.result, + ioh->result); + + END_CRIT_SECTION(); +} diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index aeacc144149..6fe55510fae 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -14,24 +14,222 @@ #include "postgres.h" +#include "miscadmin.h" +#include "storage/aio.h" +#include "storage/aio_internal.h" #include "storage/aio_subsys.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/guc.h" +static Size +AioCtlShmemSize(void) +{ + Size sz; + + /* pgaio_ctl itself */ + sz = offsetof(PgAioCtl, io_handles); + + return sz; +} + +static uint32 +AioProcs(void) +{ + return MaxBackends + NUM_AUXILIARY_PROCS; +} + +static Size +AioBackendShmemSize(void) +{ + return mul_size(AioProcs(), sizeof(PgAioBackend)); +} + +static Size +AioHandleShmemSize(void) +{ + Size sz; + + /* verify AioChooseMaxConcurrency() did its thing */ + Assert(io_max_concurrency > 0); + + /* io handles */ + sz = mul_size(AioProcs(), + mul_size(io_max_concurrency, sizeof(PgAioHandle))); + + return sz; +} + +static Size +AioHandleIOVShmemSize(void) +{ + /* + * Each IO handle can have an PG_IOV_MAX long iovec. + * + * XXX: Right now the amount of space available for each IO is PG_IOV_MAX. + * While it's tempting to use the io_combine_limit GUC, that's + * PGC_USERSET, so we can't allocate shared memory based on that. + */ + return mul_size(sizeof(struct iovec), + mul_size(mul_size(PG_IOV_MAX, AioProcs()), + io_max_concurrency)); +} + +static Size +AioHandleDataShmemSize(void) +{ + /* each buffer referenced by an iovec can have associated data */ + return mul_size(sizeof(uint64), + mul_size(mul_size(PG_IOV_MAX, AioProcs()), + io_max_concurrency)); +} + +/* + * Choose a suitable value for io_max_concurrency. + * + * It's unlikely that we could have more IOs in flight than buffers that we + * would be allowed to pin. + * + * On the upper end, apply a cap too - just because shared_buffers is large, + * it doesn't make sense have millions of buffers undergo IO concurrently. + */ +static int +AioChooseMaxConcurrency(void) +{ + uint32 max_backends; + int max_proportional_pins; + + /* Similar logic to LimitAdditionalPins() */ + max_backends = MaxBackends + NUM_AUXILIARY_PROCS; + max_proportional_pins = NBuffers / max_backends; + + max_proportional_pins = Max(max_proportional_pins, 1); + + /* apply upper limit */ + return Min(max_proportional_pins, 64); +} + Size AioShmemSize(void) { Size sz = 0; + /* + * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT. + * However, if the DBA explicitly set io_max_concurrency = -1 in the + * config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and + * we must force the matter with PGC_S_OVERRIDE. + */ + if (io_max_concurrency == -1) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", AioChooseMaxConcurrency()); + SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + if (io_max_concurrency == -1) /* failed to apply it? */ + SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + + sz = add_size(sz, AioCtlShmemSize()); + sz = add_size(sz, AioBackendShmemSize()); + sz = add_size(sz, AioHandleShmemSize()); + sz = add_size(sz, AioHandleIOVShmemSize()); + sz = add_size(sz, AioHandleDataShmemSize()); + + /* Reserve space for method specific resources. */ + if (pgaio_method_ops->shmem_size) + sz = add_size(sz, pgaio_method_ops->shmem_size()); + return sz; } void AioShmemInit(void) { + bool found; + uint32 io_handle_off = 0; + uint32 iovec_off = 0; + uint32 per_backend_iovecs = io_max_concurrency * PG_IOV_MAX; + + pgaio_ctl = (PgAioCtl *) + ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found); + + if (found) + goto out; + + memset(pgaio_ctl, 0, AioCtlShmemSize()); + + pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency; + pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs; + + pgaio_ctl->backend_state = (PgAioBackend *) + ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found); + + pgaio_ctl->io_handles = (PgAioHandle *) + ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found); + + pgaio_ctl->iovecs = (struct iovec *) + ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found); + pgaio_ctl->handle_data = (uint64 *) + ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found); + + for (int procno = 0; procno < AioProcs(); procno++) + { + PgAioBackend *bs = &pgaio_ctl->backend_state[procno]; + + bs->io_handle_off = io_handle_off; + io_handle_off += io_max_concurrency; + + dclist_init(&bs->idle_ios); + memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE); + dclist_init(&bs->in_flight_ios); + + /* initialize per-backend IOs */ + for (int i = 0; i < io_max_concurrency; i++) + { + PgAioHandle *ioh = &pgaio_ctl->io_handles[bs->io_handle_off + i]; + + ioh->generation = 1; + ioh->owner_procno = procno; + ioh->iovec_off = iovec_off; + ioh->handle_data_len = 0; + ioh->report_return = NULL; + ioh->resowner = NULL; + ioh->num_callbacks = 0; + ioh->distilled_result.status = ARS_UNKNOWN; + ioh->flags = 0; + + ConditionVariableInit(&ioh->cv); + + dclist_push_tail(&bs->idle_ios, &ioh->node); + iovec_off += PG_IOV_MAX; + } + } + +out: + /* Initialize IO method specific resources. */ + if (pgaio_method_ops->shmem_init) + pgaio_method_ops->shmem_init(!found); } void pgaio_init_backend(void) { + /* shouldn't be initialized twice */ + Assert(!pgaio_my_backend); + + if (MyProc == NULL || MyProcNumber >= AioProcs()) + elog(ERROR, "aio requires a normal PGPROC"); + + pgaio_my_backend = &pgaio_ctl->backend_state[MyProcNumber]; + + if (pgaio_method_ops->init_backend) + pgaio_method_ops->init_backend(); + + before_shmem_exit(pgaio_shutdown, 0); } diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c new file mode 100644 index 00000000000..36d2c1f492d --- /dev/null +++ b/src/backend/storage/aio/aio_io.c @@ -0,0 +1,184 @@ +/*------------------------------------------------------------------------- + * + * aio_io.c + * AIO - Low Level IO Handling + * + * Functions related to associating IO operations to IO Handles and IO-method + * independent support functions for actually performing IO. + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/aio_io.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/aio.h" +#include "storage/aio_internal.h" +#include "storage/fd.h" +#include "utils/wait_event.h" + + +static void pgaio_io_before_prep(PgAioHandle *ioh); + + + +/* -------------------------------------------------------------------------------- + * Public IO related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Scatter/gather IO needs to associate an iovec with the Handle. To support + * worker mode this data needs to be in shared memory. + */ +int +pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + + *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + + return PG_IOV_MAX; +} + +PgAioOp +pgaio_io_get_op(PgAioHandle *ioh) +{ + return ioh->op; +} + +PgAioOpData * +pgaio_io_get_op_data(PgAioHandle *ioh) +{ + return &ioh->op_data; +} + + + +/* -------------------------------------------------------------------------------- + * "Preparation" routines for individual IO operations + * + * These are called by the code actually initiating an IO, to associate the IO + * specific data with an AIO handle. + * + * Each of the preparation routines first needs to call + * pgaio_io_before_prep(), then fill IO specific fields in the handle and then + * finally call pgaio_io_stage(). + * -------------------------------------------------------------------------------- + */ + +void +pgaio_io_prep_readv(PgAioHandle *ioh, + int fd, int iovcnt, uint64 offset) +{ + pgaio_io_before_prep(ioh); + + ioh->op_data.read.fd = fd; + ioh->op_data.read.offset = offset; + ioh->op_data.read.iov_length = iovcnt; + + pgaio_io_stage(ioh, PGAIO_OP_READV); +} + +void +pgaio_io_prep_writev(PgAioHandle *ioh, + int fd, int iovcnt, uint64 offset) +{ + pgaio_io_before_prep(ioh); + + ioh->op_data.write.fd = fd; + ioh->op_data.write.offset = offset; + ioh->op_data.write.iov_length = iovcnt; + + pgaio_io_stage(ioh, PGAIO_OP_WRITEV); +} + + + +/* -------------------------------------------------------------------------------- + * Internal IO related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Execute IO operation synchronously. This is implemented here, not in + * method_sync.c, because other IO methods also might use it / fall back to + * it. + */ +void +pgaio_io_perform_synchronously(PgAioHandle *ioh) +{ + ssize_t result = 0; + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + + START_CRIT_SECTION(); + + /* Perform IO. */ + switch (ioh->op) + { + case PGAIO_OP_READV: + pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_READ); + result = pg_preadv(ioh->op_data.read.fd, iov, + ioh->op_data.read.iov_length, + ioh->op_data.read.offset); + pgstat_report_wait_end(); + break; + case PGAIO_OP_WRITEV: + pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_WRITE); + result = pg_pwritev(ioh->op_data.write.fd, iov, + ioh->op_data.write.iov_length, + ioh->op_data.write.offset); + pgstat_report_wait_end(); + break; + case PGAIO_OP_INVALID: + elog(ERROR, "trying to execute invalid IO operation"); + } + + ioh->result = result < 0 ? -errno : result; + + pgaio_io_process_completion(ioh, ioh->result); + + END_CRIT_SECTION(); +} + +/* + * Helper function to be called by IO operation preparation functions, before + * any data in the handle is set. Mostly to centralize assertions. + */ +static void +pgaio_io_before_prep(PgAioHandle *ioh) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(pgaio_my_backend->handed_out_io == ioh); + Assert(pgaio_io_has_target(ioh)); + Assert(ioh->op == PGAIO_OP_INVALID); +} + +/* + * Could be made part of the public interface, but it's not clear there's + * really a use case for that. + */ +const char * +pgaio_io_get_op_name(PgAioHandle *ioh) +{ + Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT); + + switch (ioh->op) + { + case PGAIO_OP_INVALID: + return "invalid"; + case PGAIO_OP_READV: + return "read"; + case PGAIO_OP_WRITEV: + return "write"; + } + + return NULL; /* silence compiler */ +} diff --git a/src/backend/storage/aio/aio_target.c b/src/backend/storage/aio/aio_target.c new file mode 100644 index 00000000000..b01406a6a52 --- /dev/null +++ b/src/backend/storage/aio/aio_target.c @@ -0,0 +1,114 @@ +/*------------------------------------------------------------------------- + * + * aio_target.c + * AIO - Functionality related to executing IO for different targets + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/aio_target.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "storage/aio.h" +#include "storage/aio_internal.h" + + +/* + * Registry for entities that can be the target of AIO. + */ +static const PgAioTargetInfo *pgaio_target_info[] = { + [PGAIO_TID_INVALID] = &(PgAioTargetInfo) { + .name = "invalid", + }, +}; + + + +/* -------------------------------------------------------------------------------- + * Public target related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +bool +pgaio_io_has_target(PgAioHandle *ioh) +{ + return ioh->target != PGAIO_TID_INVALID; +} + +/* + * Return the name for the target associated with the IO. Mostly useful for + * debugging/logging. + */ +const char * +pgaio_io_get_target_name(PgAioHandle *ioh) +{ + Assert(ioh->target >= 0 && ioh->target < PGAIO_TID_COUNT); + + return pgaio_target_info[ioh->target]->name; +} + +/* + * Assign a target to the IO. + * + * This has to be called exactly once before pgaio_io_prep_*() is called. + */ +void +pgaio_io_set_target(PgAioHandle *ioh, PgAioTargetID targetid) +{ + Assert(ioh->state == PGAIO_HS_HANDED_OUT); + Assert(ioh->target == PGAIO_TID_INVALID); + + ioh->target = targetid; +} + +PgAioTargetData * +pgaio_io_get_target_data(PgAioHandle *ioh) +{ + return &ioh->target_data; +} + +/* + * Return a stringified description of the IO's target. + * + * The string is localized and allocated in the current memory context. + */ +char * +pgaio_io_get_target_description(PgAioHandle *ioh) +{ + return pgaio_target_info[ioh->target]->describe_identity(&ioh->target_data); +} + + + +/* -------------------------------------------------------------------------------- + * Internal target related functions operating on IO Handles + * -------------------------------------------------------------------------------- + */ + +/* + * Internal: Check if pgaio_io_reopen() is available for the IO. + */ +bool +pgaio_io_can_reopen(PgAioHandle *ioh) +{ + return pgaio_target_info[ioh->target]->reopen != NULL; +} + +/* + * Internal: Before executing an IO outside of the context of the process the + * IO has been prepared in, the file descriptor has to be reopened - any FD + * referenced in the IO itself, won't be valid in the separate process. + */ +void +pgaio_io_reopen(PgAioHandle *ioh) +{ + Assert(ioh->target >= 0 && ioh->target < PGAIO_TID_COUNT); + Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT); + + pgaio_target_info[ioh->target]->reopen(ioh); +} diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build index c822fd4ddf7..2c26089d52e 100644 --- a/src/backend/storage/aio/meson.build +++ b/src/backend/storage/aio/meson.build @@ -2,6 +2,10 @@ backend_sources += files( 'aio.c', + 'aio_callback.c', 'aio_init.c', + 'aio_io.c', + 'aio_target.c', + 'method_sync.c', 'read_stream.c', ) diff --git a/src/backend/storage/aio/method_sync.c b/src/backend/storage/aio/method_sync.c new file mode 100644 index 00000000000..902c2428d41 --- /dev/null +++ b/src/backend/storage/aio/method_sync.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * method_sync.c + * AIO - perform "AIO" by executing it synchronously + * + * This method is mainly to check if AIO use causes regressions. Other IO + * methods might also fall back to the synchronous method for functionality + * they cannot provide. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/method_sync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "storage/aio.h" +#include "storage/aio_internal.h" + +static bool pgaio_sync_needs_synchronous_execution(PgAioHandle *ioh); +static int pgaio_sync_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); + + +const IoMethodOps pgaio_sync_ops = { + .needs_synchronous_execution = pgaio_sync_needs_synchronous_execution, + .submit = pgaio_sync_submit, +}; + + + +static bool +pgaio_sync_needs_synchronous_execution(PgAioHandle *ioh) +{ + return true; +} + +static int +pgaio_sync_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) +{ + elog(ERROR, "IO should have been executed synchronously"); + + return 0; +} diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3c594415bfd..b44e4908b25 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -191,6 +191,7 @@ ABI_compatibility: Section: ClassName - WaitEventIO +AIO_IO_COMPLETION "Waiting for IO completion." BASEBACKUP_READ "Waiting for base backup to read from a file." BASEBACKUP_SYNC "Waiting for data written by a base backup to reach durable storage." BASEBACKUP_WRITE "Waiting for base backup to write to a file." diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index e79d5343038..f48a4962089 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -3,6 +3,10 @@ * aio.h * Main AIO interface * + * This is the header to include when actually issuing AIO. When just + * declaring functions involving an AIO related type, it might suffice to + * include aio_types.h. Initialization related functions are in the dedicated + * aio_init.h. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -14,6 +18,9 @@ #ifndef AIO_H #define AIO_H +#include "storage/aio_types.h" +#include "storage/procnumber.h" + /* Enum for io_method GUC. */ @@ -26,9 +33,313 @@ typedef enum IoMethod #define DEFAULT_IO_METHOD IOMETHOD_SYNC +/* + * Flags for an IO that can be set with pgaio_io_set_flag(). + */ +typedef enum PgAioHandleFlags +{ + /* + * The IO references backend local memory. + * + * This needs to be set on an IO whenever the IO references process-local + * memory. Some IO methods do not support executing IO that references + * process local memory and thus need to fall back to executing IO + * synchronously for IOs with this flag set. + * + * Required for correctness. + */ + PGAIO_HF_REFERENCES_LOCAL = 1 << 1, + + /* + * Hint that IO will be executed synchronously. + * + * This can make it a bit cheaper to execute synchronous IO via the AIO + * interface, to avoid needing an AIO and non-AIO version of code. + * + * Advantageous to set, if applicable, but not required for correctness. + */ + PGAIO_HF_SYNCHRONOUS = 1 << 0, + + /* + * IO is using buffered IO, used to control heuristic in some IO methods. + * + * Advantageous to set, if applicable, but not required for correctness. + */ + PGAIO_HF_BUFFERED = 1 << 2, +} PgAioHandleFlags; + +/* + * The IO operations supported by the AIO subsystem. + * + * This could be in aio_internal.h, as it is not pubicly referenced, but + * PgAioOpData currently *does* need to be public, therefore keeping this + * public seems to make sense. + */ +typedef enum PgAioOp +{ + /* intentionally the zero value, to help catch zeroed memory etc */ + PGAIO_OP_INVALID = 0, + + PGAIO_OP_READV, + PGAIO_OP_WRITEV, + + /** + * In the near term we'll need at least: + * - fsync / fdatasync + * - flush_range + * + * Eventually we'll additionally want at least: + * - send + * - recv + * - accept + **/ +} PgAioOp; + +#define PGAIO_OP_COUNT (PGAIO_OP_WRITEV + 1) + + +/* + * On what is IO being performed? + * + * PgAioTargetID specific behaviour should be implemented in + * aio_target.c. + */ +typedef enum PgAioTargetID +{ + /* intentionally the zero value, to help catch zeroed memory etc */ + PGAIO_TID_INVALID = 0, +} PgAioTargetID; + +#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1) + + +/* + * Data necessary for support IO operations (see PgAioOp). + * + * NB: Note that the FDs in here may *not* be relied upon for re-issuing + * requests (e.g. for partial reads/writes or in an IO worker) - the FD might + * be from another process, or closed since. That's not a problem for staged + * IOs, as all staged IOs are submitted when closing an FD. + */ +typedef union +{ + struct + { + int fd; + uint16 iov_length; + uint64 offset; + } read; + + struct + { + int fd; + uint16 iov_length; + uint64 offset; + } write; +} PgAioOpData; + + +/* + * Information the object that IO is executed on. Mostly callbacks that + * operate on PgAioTargetData. + * + * typedef is in aio_types.h + */ +struct PgAioTargetInfo +{ + /* + * To support executing using worker processes, the file descriptor for an + * IO may need to be be reopened in a different process. + */ + void (*reopen) (PgAioHandle *ioh); + + /* describe the target of the IO, used for log messages and views */ + char *(*describe_identity) (const PgAioTargetData *sd); + + /* name of the target, used in log messages / views */ + const char *name; +}; + + +/* + * IDs for callbacks that can be registered on an IO. + * + * Callbacks are identified by an ID rather than a function pointer. There are + * two main reasons: + * + * 1) Memory within PgAioHandle is precious, due to the number of PgAioHandle + * structs in pre-allocated shared memory. + * + * 2) Due to EXEC_BACKEND function pointers are not necessarily stable between + * different backends, therefore function pointers cannot directly be in + * shared memory. + * + * Without 2), we could fairly easily allow to add new callbacks, by filling a + * ID->pointer mapping table on demand. In the presence of 2 that's still + * doable, but harder, because every process has to re-register the pointers + * so that a local ID->"backend local pointer" mapping can be maintained. + */ +typedef enum PgAioHandleCallbackID +{ + PGAIO_HCB_INVALID, +} PgAioHandleCallbackID; + + +typedef void (*PgAioHandleCallbackStage) (PgAioHandle *ioh, uint8 cb_flags); +typedef PgAioResult (*PgAioHandleCallbackComplete) (PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_flags); +typedef void (*PgAioHandleCallbackReport) (PgAioResult result, const PgAioTargetData *target_data, int elevel); + +/* typedef is in aio_types.h */ +struct PgAioHandleCallbacks +{ + /* + * Prepare resources affected by the IO for execution. This could e.g. + * include moving ownership of buffer pins to the AIO subsystem. + */ + PgAioHandleCallbackStage stage; + + /* + * Update the state of resources affected by the IO to reflect completion + * of the IO. This could e.g. include updating shared buffer state to + * signal the IO has finished. + * + * The _shared suffix indicates that this is executed by the backend that + * completed the IO, which may or may not be the backend that issued the + * IO. Obviously the callback thus can only modify resources in shared + * memory. + * + * The latest registered callback is called first. This allows + * higher-level code to register callbacks that can rely on callbacks + * registered by lower-level code to already have been executed. + * + * NB: This is called in a critical section. Errors can be signalled by + * the callback's return value, it's the responsibility of the IO's issuer + * to react appropriately. + */ + PgAioHandleCallbackComplete complete_shared; + + /* + * Like complete_shared, except called in the issuing backend. + * + * This variant of the completion callback is useful when backend-local + * state has to be updated to reflect the IO's completion. E.g. a + * temporary buffer's BufferDesc isn't accessible in complete_shared. + * + * Local callbacks are only called after complete_shared for all + * registered callbacks has been called. + */ + PgAioHandleCallbackComplete complete_local; + + /* + * Report the result of an IO operation. This is e.g. used to raise an + * error after an IO failed at the appropriate time (i.e. not when the IO + * failed, but under control of the code that issued the IO). + */ + PgAioHandleCallbackReport report; +}; + + + +/* + * How many callbacks can be registered for one IO handle. Currently we only + * need two, but it's not hard to imagine needing a few more. + */ +#define PGAIO_HANDLE_MAX_CALLBACKS 4 + + + +/* -------------------------------------------------------------------------------- + * IO Handles + * -------------------------------------------------------------------------------- + */ + +/* functions in aio.c */ +struct ResourceOwnerData; +extern PgAioHandle *pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret); +extern PgAioHandle *pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret); + +extern void pgaio_io_release(PgAioHandle *ioh); struct dlist_node; extern void pgaio_io_release_resowner(struct dlist_node *ioh_node, bool on_error); +extern void pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag); + +extern int pgaio_io_get_id(PgAioHandle *ioh); +extern ProcNumber pgaio_io_get_owner(PgAioHandle *ioh); + +extern void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow); + +/* functions in aio_io.c */ +struct iovec; +extern int pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov); + +extern PgAioOp pgaio_io_get_op(PgAioHandle *ioh); +extern PgAioOpData *pgaio_io_get_op_data(PgAioHandle *ioh); + +extern void pgaio_io_prep_readv(PgAioHandle *ioh, + int fd, int iovcnt, uint64 offset); +extern void pgaio_io_prep_writev(PgAioHandle *ioh, + int fd, int iovcnt, uint64 offset); + +/* functions in aio_target.c */ +extern void pgaio_io_set_target(PgAioHandle *ioh, PgAioTargetID targetid); +extern bool pgaio_io_has_target(PgAioHandle *ioh); +extern PgAioTargetData *pgaio_io_get_target_data(PgAioHandle *ioh); +extern char *pgaio_io_get_target_description(PgAioHandle *ioh); + +/* functions in aio_callback.c */ +extern void pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id, + uint8 cb_data); +extern void pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len); +extern void pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len); +extern uint64 *pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len); + + + +/* -------------------------------------------------------------------------------- + * IO Wait References + * -------------------------------------------------------------------------------- + */ + +extern void pgaio_wref_clear(PgAioWaitRef *iow); +extern bool pgaio_wref_valid(PgAioWaitRef *iow); +extern int pgaio_wref_get_id(PgAioWaitRef *iow); + +extern void pgaio_wref_wait(PgAioWaitRef *iow); +extern bool pgaio_wref_check_done(PgAioWaitRef *iow); + + + +/* -------------------------------------------------------------------------------- + * IO Result + * -------------------------------------------------------------------------------- + */ + +extern void pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, + int elevel); + + + +/* -------------------------------------------------------------------------------- + * Actions on multiple IOs. + * -------------------------------------------------------------------------------- + */ + +extern void pgaio_enter_batchmode(void); +extern void pgaio_exit_batchmode(void); +extern void pgaio_submit_staged(void); +extern bool pgaio_have_staged(void); + + + +/* -------------------------------------------------------------------------------- + * Other + * -------------------------------------------------------------------------------- + */ + +extern void pgaio_closing_fd(int fd); + + /* GUCs */ extern PGDLLIMPORT int io_method; diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h new file mode 100644 index 00000000000..0ba3c4f1476 --- /dev/null +++ b/src/include/storage/aio_internal.h @@ -0,0 +1,395 @@ +/*------------------------------------------------------------------------- + * + * aio_internal.h + * AIO related declarations that should only be used by the AIO subsystem + * internally. + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/aio_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef AIO_INTERNAL_H +#define AIO_INTERNAL_H + + +#include "lib/ilist.h" +#include "port/pg_iovec.h" +#include "storage/aio.h" +#include "storage/condition_variable.h" + + +/* + * The maximum number of IOs that can be batch submitted at once. + */ +#define PGAIO_SUBMIT_BATCH_SIZE 32 + + + +/* + * State machine for handles. With some exceptions, noted below, handles move + * linearly through all states. + * + * State changes should all go through pgaio_io_update_state(). + */ +typedef enum PgAioHandleState +{ + /* not in use */ + PGAIO_HS_IDLE = 0, + + /* + * Returned by pgaio_io_acquire(). The next state is either DEFINED (if + * pgaio_io_prep_*() is called), or IDLE (if pgaio_io_release() is + * called). + */ + PGAIO_HS_HANDED_OUT, + + /* + * pgaio_io_prep_*() has been called, but IO is not yet staged. At this + * point the handle has all the information for the IO to be executed. + */ + PGAIO_HS_DEFINED, + + /* + * stage() callbacks have been called, handle ready to be submitted for + * execution. Unless in batchmode (see c.f. pgaio_enter_batchmode()), the + * IO will be submitted immediately after. + */ + PGAIO_HS_STAGED, + + /* IO has been submitted to the IO method for execution */ + PGAIO_HS_SUBMITTED, + + /* IO finished, but result has not yet been processed */ + PGAIO_HS_COMPLETED_IO, + + /* + * IO completed, shared completion has been called. + * + * If the IO completion occurs in the issuing backend, local callbacks + * will immediately be called. Otherwise the handle stays in + * COMPLETED_SHARED until the issuing backend waits for the completion of + * the IO. + */ + PGAIO_HS_COMPLETED_SHARED, + + /* + * IO completed, local completion has been called. + * + * After this the handle will be made reusable and go into IDLE state. + */ + PGAIO_HS_COMPLETED_LOCAL, +} PgAioHandleState; + + +struct ResourceOwnerData; + +/* typedef is in aio_types.h */ +struct PgAioHandle +{ + /* all state updates should go through pgaio_io_update_state() */ + PgAioHandleState state:8; + + /* what are we operating on */ + PgAioTargetID target:8; + + /* which IO operation */ + PgAioOp op:8; + + /* bitfield of PgAioHandleFlags */ + uint8 flags; + + uint8 num_callbacks; + + /* using the proper type here would use more space */ + uint8 callbacks[PGAIO_HANDLE_MAX_CALLBACKS]; + + /* data forwarded to each callback */ + uint8 callbacks_data[PGAIO_HANDLE_MAX_CALLBACKS]; + + /* + * Length of data associated with handle using + * pgaio_io_set_handle_data_*(). + */ + uint8 handle_data_len; + + /* XXX: could be optimized out with some pointer math */ + int32 owner_procno; + + /* raw result of the IO operation */ + int32 result; + + /** + * In which list the handle is registered, depends on the state: + * - IDLE, in per-backend list + * - HANDED_OUT - not in a list + * - DEFINED - not in a list + * - STAGED - in per-backend staged array + * - SUBMITTED - in issuer's in_flight list + * - COMPLETED_IO - in issuer's in_flight list + * - COMPLETED_SHARED - in issuer's in_flight list + **/ + dlist_node node; + + struct ResourceOwnerData *resowner; + dlist_node resowner_node; + + /* incremented every time the IO handle is reused */ + uint64 generation; + + /* + * To wait for the IO to complete other backends can wait on this CV. Note + * that, if in SUBMITTED state, a waiter first needs to check if it needs + * to do work via IoMethodOps->wait_one(). + */ + ConditionVariable cv; + + /* result of shared callback, passed to issuer callback */ + PgAioResult distilled_result; + + /* + * Index into PgAioCtl->iovecs and PgAioCtl->handle_data. + * + * At the moment there's no need to differentiate between the two, but + * that won't necessarily stay that way. + */ + uint32 iovec_off; + + /* + * If not NULL, this memory location will be updated with information + * about the IOs completion iff the issuing backend learns about the IOs + * completion. + */ + PgAioReturn *report_return; + + /* Data necessary for the IO to be performed */ + PgAioOpData op_data; + + /* + * Data necessary to identify the object undergoing IO to higher-level + * code. Needs to be sufficient to allow another backend to reopen the + * file. + */ + PgAioTargetData target_data; +}; + + +typedef struct PgAioBackend +{ + /* index into PgAioCtl->io_handles */ + uint32 io_handle_off; + + /* IO Handles that currently are not used */ + dclist_head idle_ios; + + /* + * Only one IO may be returned by pgaio_io_acquire()/pgaio_io_acquire_nb() + * without having been either defined (by actually associating it with IO) + * or released (with pgaio_io_release()). This restriction is necessary to + * guarantee that we always can acquire an IO. ->handed_out_io is used to + * enforce that rule. + */ + PgAioHandle *handed_out_io; + + /* Are we currently in batchmode? See pgaio_enter_batchmode(). */ + bool in_batchmode; + + /* + * IOs that are defined, but not yet submitted. + */ + uint16 num_staged_ios; + PgAioHandle *staged_ios[PGAIO_SUBMIT_BATCH_SIZE]; + + /* + * List of in-flight IOs. Also contains IOs that aren't strictly speaking + * in-flight anymore, but have been waited-for and completed by another + * backend. Once this backend sees such an IO it'll be reclaimed. + * + * The list is ordered by submission time, with more recently submitted + * IOs being appended at the end. + */ + dclist_head in_flight_ios; +} PgAioBackend; + + +typedef struct PgAioCtl +{ + int backend_state_count; + PgAioBackend *backend_state; + + /* + * Array of iovec structs. Each iovec is owned by a specific backend. The + * allocation is in PgAioCtl to allow the maximum number of iovecs for + * individual IOs to be configurable with PGC_POSTMASTER GUC. + */ + uint32 iovec_count; + struct iovec *iovecs; + + /* + * For, e.g., an IO covering multiple buffers in shared / temp buffers, we + * need to get Buffer IDs during completion to be able to change the + * BufferDesc state accordingly. This space can be used to store e.g. + * Buffer IDs. Note that the actual iovec might be shorter than this, + * because we combine neighboring pages into one larger iovec entry. + */ + uint64 *handle_data; + + uint32 io_handle_count; + PgAioHandle *io_handles; +} PgAioCtl; + + + +/* + * Callbacks used to implement an IO method. + */ +typedef struct IoMethodOps +{ + /* global initialization */ + + /* + * Amount of additional shared memory to reserve for the io_method. Called + * just like a normal ipci.c style *Size() function. Optional. + */ + size_t (*shmem_size) (void); + + /* + * Initialize shared memory. First time is true if AIO's shared memory was + * just initialized, false otherwise. Optional. + */ + void (*shmem_init) (bool first_time); + + /* + * Per-backend initialization. Optional. + */ + void (*init_backend) (void); + + + /* handling of IOs */ + + /* optional */ + bool (*needs_synchronous_execution) (PgAioHandle *ioh); + + /* + * Start executing passed in IOs. + * + * Will not be called if ->needs_synchronous_execution() returned true. + * + * num_staged_ios is <= PGAIO_SUBMIT_BATCH_SIZE. + * + * Always called in a critical section. + */ + int (*submit) (uint16 num_staged_ios, PgAioHandle **staged_ios); + + /* + * Wait for the IO to complete. Optional. + * + * If not provided, it needs to be guaranteed that the IO method calls + * pgaio_io_process_completion() without further interaction by the + * issuing backend. + */ + void (*wait_one) (PgAioHandle *ioh, + uint64 ref_generation); +} IoMethodOps; + + +/* aio.c */ +extern bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state); +extern void pgaio_io_stage(PgAioHandle *ioh, PgAioOp op); +extern void pgaio_io_process_completion(PgAioHandle *ioh, int result); +extern void pgaio_io_prepare_submit(PgAioHandle *ioh); +extern bool pgaio_io_needs_synchronous_execution(PgAioHandle *ioh); +extern const char *pgaio_io_get_state_name(PgAioHandle *ioh); +const char *pgaio_result_status_string(PgAioResultStatus rs); +extern void pgaio_shutdown(int code, Datum arg); + +/* aio_callback.c */ +extern void pgaio_io_call_stage(PgAioHandle *ioh); +extern void pgaio_io_call_complete_shared(PgAioHandle *ioh); +extern void pgaio_io_call_complete_local(PgAioHandle *ioh); + +/* aio_io.c */ +extern void pgaio_io_perform_synchronously(PgAioHandle *ioh); +extern const char *pgaio_io_get_op_name(PgAioHandle *ioh); + +/* aio_target.c */ +extern bool pgaio_io_can_reopen(PgAioHandle *ioh); +extern void pgaio_io_reopen(PgAioHandle *ioh); +extern const char *pgaio_io_get_target_name(PgAioHandle *ioh); + + +/* + * The AIO subsystem has fairly verbose debug logging support. This can be + * enabled/disabled at build time. The reason for this is that + * a) the verbosity can make debugging things on higher levels hard + * b) even if logging can be skipped due to elevel checks, it still causes a + * measurable slowdown + * + * XXX: This likely should be eventually be disabled by default, at least in + * non-assert builds. + */ +#define PGAIO_VERBOSE 1 + +/* + * Simple ereport() wrapper that only logs if PGAIO_VERBOSE is defined. + * + * This intentionally still compiles the code, guarded by a constant if (0), + * if verbose logging is disabled, to make it less likely that debug logging + * is silently broken. + * + * The current definition requires passing at least one argument. + */ +#define pgaio_debug(elevel, msg, ...) \ + do { \ + if (PGAIO_VERBOSE) \ + ereport(elevel, \ + errhidestmt(true), errhidecontext(true), \ + errmsg_internal(msg, \ + __VA_ARGS__)); \ + } while(0) + +/* + * Simple ereport() wrapper. Note that the definition requires passing at + * least one argument. + */ +#define pgaio_debug_io(elevel, ioh, msg, ...) \ + pgaio_debug(elevel, "io %-10d|op %-5s|target %-4s|state %-16s: " msg, \ + pgaio_io_get_id(ioh), \ + pgaio_io_get_op_name(ioh), \ + pgaio_io_get_target_name(ioh), \ + pgaio_io_get_state_name(ioh), \ + __VA_ARGS__) + + +#ifdef USE_INJECTION_POINTS + +extern void pgaio_io_call_inj(PgAioHandle *ioh, const char *injection_point); + +/* just for use in tests, from within injection points */ +extern PgAioHandle *pgaio_inj_io_get(void); + +#else + +#define pgaio_io_call_inj(ioh, injection_point) (void) 0 + +/* + * no fallback for pgaio_inj_io_get, all code using injection points better be + * guarded by USE_INJECTION_POINTS. + */ + +#endif + + +/* Declarations for the tables of function pointers exposed by each IO method. */ +extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops; + +extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops; +extern PGDLLIMPORT PgAioCtl *pgaio_ctl; +extern PGDLLIMPORT PgAioBackend *pgaio_my_backend; + + + +#endif /* AIO_INTERNAL_H */ diff --git a/src/include/storage/aio_types.h b/src/include/storage/aio_types.h new file mode 100644 index 00000000000..a5cc658efbd --- /dev/null +++ b/src/include/storage/aio_types.h @@ -0,0 +1,117 @@ +/*------------------------------------------------------------------------- + * + * aio_types.h + * AIO related types that are useful to include separately, to reduce the + * "include burden". + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/aio_types.h + * + *------------------------------------------------------------------------- + */ +#ifndef AIO_TYPES_H +#define AIO_TYPES_H + +#include "storage/block.h" +#include "storage/relfilelocator.h" + + +typedef struct PgAioHandle PgAioHandle; +typedef struct PgAioHandleCallbacks PgAioHandleCallbacks; +typedef struct PgAioTargetInfo PgAioTargetInfo; + +/* + * A reference to an IO that can be used to wait for the IO (using + * pgaio_wref_wait()) to complete. + * + * These can be passed across process boundaries. + */ +typedef struct PgAioWaitRef +{ + /* internal ID identifying the specific PgAioHandle */ + uint32 aio_index; + + /* + * IO handles are reused. To detect if a handle was reused, and thereby + * avoid unnecessarily waiting for a newer IO, each time the handle is + * reused a generation number is increased. + * + * To avoid requiring alignment sufficient for an int64, split the + * generation into two. + */ + uint32 generation_upper; + uint32 generation_lower; +} PgAioWaitRef; + + +/* + * Information identifying what the IO is being performed on. + * + * This needs sufficient information to + * + * a) Reopen the file for the IO if the IO is executed in a context that + * cannot use the FD provided initially (e.g. because the IO is executed in + * a worker process). + * + * b) Describe the object the IO is performed on in log / error messages. + */ +typedef union PgAioTargetData +{ + /* just as an example placeholder for later */ + struct + { + uint32 queue_id; + } wal; +} PgAioTargetData; + + +/* + * The status of an AIO operation. + */ +typedef enum PgAioResultStatus +{ + ARS_UNKNOWN, /* not yet completed / uninitialized */ + ARS_OK, + ARS_PARTIAL, /* did not fully succeed, but no error */ + ARS_ERROR, +} PgAioResultStatus; + + +/* + * Result of IO operation, visible only to the initiator of IO. + */ +typedef struct PgAioResult +{ + /* + * This is of type PgAioHandleCallbackID, but can't use a bitfield of an + * enum, because some compilers treat enums as signed. + */ + uint32 id:8; + + /* of type PgAioResultStatus, see above */ + uint32 status:2; + + /* meaning defined by callback->error */ + uint32 error_data:22; + + int32 result; +} PgAioResult; + + +/* + * Combination of PgAioResult with minimal metadata about the IO. + * + * Contains sufficient information to be able, in case the IO [partially] + * fails, to log/raise an error under control of the IO issuing code. + */ +typedef struct PgAioReturn +{ + PgAioResult result; + PgAioTargetData target_data; +} PgAioReturn; + + +#endif /* AIO_TYPES_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3c9e823f07e..f4261145353 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1280,6 +1280,7 @@ InvalMessageArray InvalidationInfo InvalidationMsgsGroup IoMethod +IoMethodOps IpcMemoryId IpcMemoryKey IpcMemoryState @@ -2127,6 +2128,26 @@ Permutation PermutationStep PermutationStepBlocker PermutationStepBlockerType +PgAioBackend +PgAioCtl +PgAioHandle +PgAioHandleCallbackID +PgAioHandleCallbackStage +PgAioHandleCallbackComplete +PgAioHandleCallbackReport +PgAioHandleCallbacks +PgAioHandleCallbacksEntry +PgAioHandleFlags +PgAioHandleState +PgAioOp +PgAioOpData +PgAioResult +PgAioResultStatus +PgAioReturn +PgAioTargetData +PgAioTargetID +PgAioTargetInfo +PgAioWaitRef PgArchData PgBackendGSSStatus PgBackendSSLStatus