1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-25 21:42:33 +03:00

aio: Basic subsystem initialization

This commit just does the minimal wiring up of the AIO subsystem, added in the
next commit, to the rest of the system. The next commit contains more details
about motivation and architecture.

This commit is kept separate to make it easier to review, separating the
changes across the tree, from the implementation of the new subsystem.

We discussed squashing this commit with the main commit before merging AIO,
but there has been a mild preference for keeping it separate.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Noah Misch <noah@leadboat.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
This commit is contained in:
Andres Freund 2025-03-17 18:51:33 -04:00
parent 65db3963ae
commit 02844012b3
24 changed files with 356 additions and 0 deletions

View File

@ -2638,6 +2638,57 @@ include_dir 'conf.d'
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-io-max-concurrency" xreflabel="io_max_concurrency">
<term><varname>io_max_concurrency</varname> (<type>integer</type>)
<indexterm>
<primary><varname>io_max_concurrency</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Controls the maximum number of I/O operations that one process can
execute simultaneously.
</para>
<para>
The default setting of <literal>-1</literal> selects a number based
on <xref linkend="guc-shared-buffers"/> and the maximum number of
processes (<xref linkend="guc-max-connections"/>, <xref
linkend="guc-autovacuum-worker-slots"/>, <xref
linkend="guc-max-worker-processes"/> and <xref
linkend="guc-max-wal-senders"/>), but not more than
<literal>64</literal>.
</para>
<para>
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-io-method" xreflabel="io_method">
<term><varname>io_method</varname> (<type>enum</type>)
<indexterm>
<primary><varname>io_method</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Selects the method for executing asynchronous I/O.
Possible values are:
<itemizedlist>
<listitem>
<para>
<literal>sync</literal> (execute asynchronous-eligible I/O synchronously)
</para>
</listitem>
</itemizedlist>
</para>
<para>
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>

View File

@ -51,6 +51,7 @@
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "storage/aio_subsys.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
@ -2411,6 +2412,8 @@ CommitTransaction(void)
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
AtEOXact_Aio(true);
/* Check we've released all buffer pins */
AtEOXact_Buffers(true);
@ -2716,6 +2719,8 @@ PrepareTransaction(void)
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
AtEOXact_Aio(true);
/* Check we've released all buffer pins */
AtEOXact_Buffers(true);
@ -2830,6 +2835,8 @@ AbortTransaction(void)
pgstat_report_wait_end();
pgstat_progress_end_command();
pgaio_error_cleanup();
/* Clean up buffer content locks, too */
UnlockBuffers();
@ -2960,6 +2967,7 @@ AbortTransaction(void)
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
AtEOXact_Aio(false);
AtEOXact_Buffers(false);
AtEOXact_RelationCache(false);
AtEOXact_TypeCache();
@ -5232,6 +5240,9 @@ AbortSubTransaction(void)
pgstat_report_wait_end();
pgstat_progress_end_command();
pgaio_error_cleanup();
UnlockBuffers();
/* Reset WAL record construction state */
@ -5326,6 +5337,7 @@ AbortSubTransaction(void)
RESOURCE_RELEASE_BEFORE_LOCKS,
false, false);
AtEOXact_Aio(false);
AtEOSubXact_RelationCache(false, s->subTransactionId,
s->parent->subTransactionId);
AtEOSubXact_TypeCache();

View File

@ -88,6 +88,7 @@
#include "postmaster/autovacuum.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@ -465,6 +466,7 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
*/
LWLockReleaseAll();
pgstat_report_wait_end();
pgaio_error_cleanup();
UnlockBuffers();
/* this is probably dead code, but let's be safe: */
if (AuxProcessResourceOwner)

View File

@ -38,6 +38,7 @@
#include "postmaster/auxprocess.h"
#include "postmaster/bgwriter.h"
#include "postmaster/interrupt.h"
#include "storage/aio_subsys.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
@ -168,6 +169,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
*/
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgaio_error_cleanup();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);

View File

@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/interrupt.h"
#include "replication/syncrep.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
@ -276,6 +277,7 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len)
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
pgaio_error_cleanup();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);

View File

@ -40,6 +40,7 @@
#include "postmaster/interrupt.h"
#include "postmaster/pgarch.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@ -568,6 +569,7 @@ pgarch_archiveXlog(char *xlog)
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
pgaio_error_cleanup();
ReleaseAuxProcessResources(false);
AtEOXact_Files(false);
AtEOXact_HashTables(false);

View File

@ -38,6 +38,7 @@
#include "postmaster/interrupt.h"
#include "postmaster/walsummarizer.h"
#include "replication/walreceiver.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@ -289,6 +290,7 @@ WalSummarizerMain(const void *startup_data, size_t startup_data_len)
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
pgaio_error_cleanup();
ReleaseAuxProcessResources(false);
AtEOXact_Files(false);
AtEOXact_HashTables(false);

View File

@ -51,6 +51,7 @@
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
@ -164,6 +165,7 @@ WalWriterMain(const void *startup_data, size_t startup_data_len)
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
pgaio_error_cleanup();
UnlockBuffers();
ReleaseAuxProcessResources(false);
AtEOXact_Buffers(false);

View File

@ -79,6 +79,7 @@
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@ -327,6 +328,7 @@ WalSndErrorCleanup(void)
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
pgaio_error_cleanup();
if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
wal_segment_close(xlogreader);

View File

@ -9,6 +9,8 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = \
aio.o \
aio_init.o \
read_stream.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,90 @@
/*-------------------------------------------------------------------------
*
* aio.c
* AIO - Core Logic
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "lib/ilist.h"
#include "storage/aio.h"
#include "storage/aio_subsys.h"
#include "utils/guc.h"
#include "utils/guc_hooks.h"
/* Options for io_method. */
const struct config_enum_entry io_method_options[] = {
{"sync", IOMETHOD_SYNC, false},
{NULL, 0, false}
};
/* GUCs */
int io_method = DEFAULT_IO_METHOD;
int io_max_concurrency = -1;
/*
* Release IO handle during resource owner cleanup.
*/
void
pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
{
}
/*
* Perform AIO related cleanup after an error.
*
* This should be called early in the error recovery paths, as later steps may
* need to issue AIO (e.g. to record a transaction abort WAL record).
*/
void
pgaio_error_cleanup(void)
{
}
/*
* Perform AIO related checks at (sub-)transactional boundaries.
*
* This should be called late during (sub-)transactional commit/abort, after
* all steps that might need to perform AIO, so that we can verify that the
* AIO subsystem is in a valid state at the end of a transaction.
*/
void
AtEOXact_Aio(bool is_commit)
{
}
void
assign_io_method(int newval, void *extra)
{
}
bool
check_io_max_concurrency(int *newval, void **extra, GucSource source)
{
if (*newval == -1)
{
/*
* Auto-tuning will be applied later during startup, as auto-tuning
* depends on the value of various GUCs.
*/
return true;
}
else if (*newval == 0)
{
GUC_check_errdetail("Only -1 or values bigger than 0 are valid.");
return false;
}
return true;
}

View File

@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* aio_init.c
* AIO - Subsystem Initialization
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio_init.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/aio_subsys.h"
Size
AioShmemSize(void)
{
Size sz = 0;
return sz;
}
void
AioShmemInit(void)
{
}
void
pgaio_init_backend(void)
{
}

View File

@ -1,5 +1,7 @@
# Copyright (c) 2024-2025, PostgreSQL Global Development Group
backend_sources += files(
'aio.c',
'aio_init.c',
'read_stream.c',
)

View File

@ -37,6 +37,7 @@
#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/dsm_registry.h"
@ -148,6 +149,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, WaitEventCustomShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
size = add_size(size, AioShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@ -340,6 +342,7 @@ CreateOrAttachShmemStructs(void)
StatsShmemInit();
WaitEventCustomShmemInit();
InjectionPointShmemInit();
AioShmemInit();
}
/*

View File

@ -43,6 +43,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/walsender.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@ -635,6 +636,12 @@ BaseInit(void)
*/
pgstat_initialize();
/*
* Initialize AIO before infrastructure that might need to actually
* execute AIO.
*/
pgaio_init_backend();
/* Do local initialization of storage and buffer managers */
InitSync();
smgrinit();

View File

@ -72,6 +72,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/large_object.h"
@ -3254,6 +3255,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
{"io_max_concurrency",
PGC_POSTMASTER,
RESOURCES_IO,
gettext_noop("Max number of IOs that one process can execute simultaneously."),
NULL,
},
&io_max_concurrency,
-1, -1, 1024,
check_io_max_concurrency, NULL, NULL
},
{
{"backend_flush_after", PGC_USERSET, RESOURCES_IO,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
@ -5311,6 +5324,16 @@ struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
{
{"io_method", PGC_POSTMASTER, RESOURCES_IO,
gettext_noop("Selects the method for executing asynchronous I/O."),
NULL
},
&io_method,
DEFAULT_IO_METHOD, io_method_options,
NULL, assign_io_method, NULL
},
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL

View File

@ -202,6 +202,12 @@
#maintenance_io_concurrency = 10 # 1-1000; 0 disables prefetching
#io_combine_limit = 128kB # usually 1-32 blocks (depends on OS)
#io_method = sync # sync (change requires restart)
#io_max_concurrency = -1 # Max number of IOs that one process
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
# - Worker Processes -
#max_worker_processes = 8 # (change requires restart)

View File

@ -47,6 +47,8 @@
#include "common/hashfn.h"
#include "common/int.h"
#include "lib/ilist.h"
#include "storage/aio.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
#include "storage/proc.h"
@ -155,6 +157,12 @@ struct ResourceOwnerData
/* The local locks cache. */
LOCALLOCK *locks[MAX_RESOWNER_LOCKS]; /* list of owned locks */
/*
* AIO handles need be registered in critical sections and therefore
* cannot use the normal ResourceElem mechanism.
*/
dlist_head aio_handles;
};
@ -425,6 +433,8 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
parent->firstchild = owner;
}
dlist_init(&owner->aio_handles);
return owner;
}
@ -725,6 +735,13 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
* so issue warnings. In the abort case, just clean up quietly.
*/
ResourceOwnerReleaseAll(owner, phase, isCommit);
while (!dlist_is_empty(&owner->aio_handles))
{
dlist_node *node = dlist_head_node(&owner->aio_handles);
pgaio_io_release_resowner(node, !isCommit);
}
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
@ -1082,3 +1099,15 @@ ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
elog(ERROR, "lock reference %p is not owned by resource owner %s",
locallock, owner->name);
}
void
ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
{
dlist_push_tail(&owner->aio_handles, ioh_node);
}
void
ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
{
dlist_delete_from(&owner->aio_handles, ioh_node);
}

38
src/include/storage/aio.h Normal file
View File

@ -0,0 +1,38 @@
/*-------------------------------------------------------------------------
*
* aio.h
* Main AIO interface
*
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/aio.h
*
*-------------------------------------------------------------------------
*/
#ifndef AIO_H
#define AIO_H
/* Enum for io_method GUC. */
typedef enum IoMethod
{
IOMETHOD_SYNC = 0,
} IoMethod;
/* We'll default to synchronous execution. */
#define DEFAULT_IO_METHOD IOMETHOD_SYNC
struct dlist_node;
extern void pgaio_io_release_resowner(struct dlist_node *ioh_node, bool on_error);
/* GUCs */
extern PGDLLIMPORT int io_method;
extern PGDLLIMPORT int io_max_concurrency;
#endif /* AIO_H */

View File

@ -0,0 +1,33 @@
/*-------------------------------------------------------------------------
*
* aio_subsys.h
* Interaction with AIO as a subsystem, rather than actually issuing AIO
*
* This header is for AIO related functionality that's being called by files
* that don't perform AIO, but interact with the AIO subsystem in some
* form. E.g. postmaster.c and shared memory initialization need to initialize
* AIO but don't perform AIO.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/aio_subsys.h
*
*-------------------------------------------------------------------------
*/
#ifndef AIO_SUBSYS_H
#define AIO_SUBSYS_H
/* aio_init.c */
extern Size AioShmemSize(void);
extern void AioShmemInit(void);
extern void pgaio_init_backend(void);
/* aio.c */
extern void pgaio_error_cleanup(void);
extern void AtEOXact_Aio(bool is_commit);
#endif /* AIO_SUBSYS_H */

View File

@ -318,6 +318,7 @@ extern PGDLLIMPORT bool optimize_bounded_sort;
*/
extern PGDLLIMPORT const struct config_enum_entry archive_mode_options[];
extern PGDLLIMPORT const struct config_enum_entry dynamic_shared_memory_options[];
extern PGDLLIMPORT const struct config_enum_entry io_method_options[];
extern PGDLLIMPORT const struct config_enum_entry recovery_target_action_options[];
extern PGDLLIMPORT const struct config_enum_entry wal_level_options[];
extern PGDLLIMPORT const struct config_enum_entry wal_sync_method_options[];

View File

@ -64,6 +64,8 @@ extern bool check_default_with_oids(bool *newval, void **extra,
extern bool check_effective_io_concurrency(int *newval, void **extra,
GucSource source);
extern bool check_huge_page_size(int *newval, void **extra, GucSource source);
extern void assign_io_method(int newval, void *extra);
extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source);
extern const char *show_in_hot_standby(void);
extern bool check_locale_messages(char **newval, void **extra, GucSource source);
extern void assign_locale_messages(const char *newval, void *extra);

View File

@ -164,4 +164,9 @@ struct LOCALLOCK;
extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCK *locallock);
extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *locallock);
/* special support for AIO */
struct dlist_node;
extern void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node);
extern void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node);
#endif /* RESOWNER_H */

View File

@ -1279,6 +1279,7 @@ IntoClause
InvalMessageArray
InvalidationInfo
InvalidationMsgsGroup
IoMethod
IpcMemoryId
IpcMemoryKey
IpcMemoryState