mirror of
https://github.com/postgres/postgres.git
synced 2025-04-25 21:42:33 +03:00
aio: Add io_method=io_uring
Performing AIO using io_uring can be considerably faster than io_method=worker, particularly when lots of small IOs are issued, as a) the context-switch overhead for worker based AIO becomes more significant b) the number of IO workers can become limiting io_uring, however, is linux specific and requires an additional compile-time dependency (liburing). This implementation is fairly simple and there are substantial optimization opportunities. The description of the existing AIO_IO_COMPLETION wait event is updated to make the difference between it and the new AIO_IO_URING_EXECUTION clearer. Reviewed-by: Noah Misch <noah@leadboat.com> Reviewed-by: Jakub Wartak <jakub.wartak@enterprisedb.com> Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
parent
8eadd5c73c
commit
c325a7633f
@ -493,11 +493,14 @@ task:
|
|||||||
# - Uses undefined behaviour and alignment sanitizers, sanitizer failures
|
# - Uses undefined behaviour and alignment sanitizers, sanitizer failures
|
||||||
# are typically printed in the server log
|
# are typically printed in the server log
|
||||||
# - Test both 64bit and 32 bit builds
|
# - Test both 64bit and 32 bit builds
|
||||||
|
# - uses io_method=io_uring
|
||||||
- name: Linux - Debian Bookworm - Meson
|
- name: Linux - Debian Bookworm - Meson
|
||||||
|
|
||||||
env:
|
env:
|
||||||
CCACHE_MAXSIZE: "400M" # tests two different builds
|
CCACHE_MAXSIZE: "400M" # tests two different builds
|
||||||
SANITIZER_FLAGS: -fsanitize=alignment,undefined
|
SANITIZER_FLAGS: -fsanitize=alignment,undefined
|
||||||
|
PG_TEST_INITDB_EXTRA_OPTS: >-
|
||||||
|
-c io_method=io_uring
|
||||||
|
|
||||||
configure_script: |
|
configure_script: |
|
||||||
su postgres <<-EOF
|
su postgres <<-EOF
|
||||||
|
@ -2710,6 +2710,14 @@ include_dir 'conf.d'
|
|||||||
<literal>worker</literal> (execute asynchronous I/O using worker processes)
|
<literal>worker</literal> (execute asynchronous I/O using worker processes)
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
<literal>io_uring</literal> (execute asynchronous I/O using
|
||||||
|
io_uring, requires a build with
|
||||||
|
<link linkend="configure-option-with-liburing"><option>--with-liburing</option></link> /
|
||||||
|
<link linkend="configure-with-liburing-meson"><option>-Dliburing</option></link>)
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
<listitem>
|
<listitem>
|
||||||
<para>
|
<para>
|
||||||
<literal>sync</literal> (execute asynchronous-eligible I/O synchronously)
|
<literal>sync</literal> (execute asynchronous-eligible I/O synchronously)
|
||||||
|
@ -14,6 +14,7 @@ OBJS = \
|
|||||||
aio_init.o \
|
aio_init.o \
|
||||||
aio_io.o \
|
aio_io.o \
|
||||||
aio_target.o \
|
aio_target.o \
|
||||||
|
method_io_uring.o \
|
||||||
method_sync.o \
|
method_sync.o \
|
||||||
method_worker.o \
|
method_worker.o \
|
||||||
read_stream.o
|
read_stream.o
|
||||||
|
@ -65,6 +65,9 @@ static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
|
|||||||
const struct config_enum_entry io_method_options[] = {
|
const struct config_enum_entry io_method_options[] = {
|
||||||
{"sync", IOMETHOD_SYNC, false},
|
{"sync", IOMETHOD_SYNC, false},
|
||||||
{"worker", IOMETHOD_WORKER, false},
|
{"worker", IOMETHOD_WORKER, false},
|
||||||
|
#ifdef IOMETHOD_IO_URING_ENABLED
|
||||||
|
{"io_uring", IOMETHOD_IO_URING, false},
|
||||||
|
#endif
|
||||||
{NULL, 0, false}
|
{NULL, 0, false}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -82,6 +85,9 @@ PgAioBackend *pgaio_my_backend;
|
|||||||
static const IoMethodOps *const pgaio_method_ops_table[] = {
|
static const IoMethodOps *const pgaio_method_ops_table[] = {
|
||||||
[IOMETHOD_SYNC] = &pgaio_sync_ops,
|
[IOMETHOD_SYNC] = &pgaio_sync_ops,
|
||||||
[IOMETHOD_WORKER] = &pgaio_worker_ops,
|
[IOMETHOD_WORKER] = &pgaio_worker_ops,
|
||||||
|
#ifdef IOMETHOD_IO_URING_ENABLED
|
||||||
|
[IOMETHOD_IO_URING] = &pgaio_uring_ops,
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
/* callbacks for the configured io_method, set by assign_io_method */
|
/* callbacks for the configured io_method, set by assign_io_method */
|
||||||
@ -1118,6 +1124,41 @@ pgaio_closing_fd(int fd)
|
|||||||
* it's probably not worth it.
|
* it's probably not worth it.
|
||||||
*/
|
*/
|
||||||
pgaio_submit_staged();
|
pgaio_submit_staged();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If requested by the IO method, wait for all IOs that use the
|
||||||
|
* to-be-closed FD.
|
||||||
|
*/
|
||||||
|
if (pgaio_method_ops->wait_on_fd_before_close)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* As waiting for one IO to complete may complete multiple IOs, we
|
||||||
|
* can't just use a mutable list iterator. The maximum number of
|
||||||
|
* in-flight IOs is fairly small, so just restart the loop after
|
||||||
|
* waiting for an IO.
|
||||||
|
*/
|
||||||
|
while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
PgAioHandle *ioh = NULL;
|
||||||
|
|
||||||
|
dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
|
||||||
|
{
|
||||||
|
ioh = dclist_container(PgAioHandle, node, iter.cur);
|
||||||
|
|
||||||
|
if (pgaio_io_uses_fd(ioh, fd))
|
||||||
|
break;
|
||||||
|
else
|
||||||
|
ioh = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ioh)
|
||||||
|
break;
|
||||||
|
|
||||||
|
/* see comment in pgaio_io_wait_for_free() about raciness */
|
||||||
|
pgaio_io_wait(ioh, ioh->generation);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -188,3 +188,25 @@ pgaio_io_get_op_name(PgAioHandle *ioh)
|
|||||||
|
|
||||||
return NULL; /* silence compiler */
|
return NULL; /* silence compiler */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Used to determine if an IO needs to be waited upon before the file
|
||||||
|
* descriptor can be closed.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
pgaio_io_uses_fd(PgAioHandle *ioh, int fd)
|
||||||
|
{
|
||||||
|
Assert(ioh->state >= PGAIO_HS_DEFINED);
|
||||||
|
|
||||||
|
switch (ioh->op)
|
||||||
|
{
|
||||||
|
case PGAIO_OP_READV:
|
||||||
|
return ioh->op_data.read.fd == fd;
|
||||||
|
case PGAIO_OP_WRITEV:
|
||||||
|
return ioh->op_data.write.fd == fd;
|
||||||
|
case PGAIO_OP_INVALID:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; /* silence compiler */
|
||||||
|
}
|
||||||
|
@ -6,6 +6,7 @@ backend_sources += files(
|
|||||||
'aio_init.c',
|
'aio_init.c',
|
||||||
'aio_io.c',
|
'aio_io.c',
|
||||||
'aio_target.c',
|
'aio_target.c',
|
||||||
|
'method_io_uring.c',
|
||||||
'method_sync.c',
|
'method_sync.c',
|
||||||
'method_worker.c',
|
'method_worker.c',
|
||||||
'read_stream.c',
|
'read_stream.c',
|
||||||
|
484
src/backend/storage/aio/method_io_uring.c
Normal file
484
src/backend/storage/aio/method_io_uring.c
Normal file
@ -0,0 +1,484 @@
|
|||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* method_io_uring.c
|
||||||
|
* AIO - perform AIO using Linux' io_uring
|
||||||
|
*
|
||||||
|
* For now we create one io_uring instance for each backend. These io_uring
|
||||||
|
* instances have to be created in postmaster, during startup, to allow other
|
||||||
|
* backends to process IO completions, if the issuing backend is currently
|
||||||
|
* busy doing other things. Other backends may not use another backend's
|
||||||
|
* io_uring instance to submit IO, that'd require additional locking that
|
||||||
|
* would likely be harmful for performance.
|
||||||
|
*
|
||||||
|
* We likely will want to introduce a backend-local io_uring instance in the
|
||||||
|
* future, e.g. for FE/BE network IO.
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* src/backend/storage/aio/method_io_uring.c
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
/* included early, for IOMETHOD_IO_URING_ENABLED */
|
||||||
|
#include "storage/aio.h"
|
||||||
|
|
||||||
|
#ifdef IOMETHOD_IO_URING_ENABLED
|
||||||
|
|
||||||
|
#include <liburing.h>
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "storage/aio_internal.h"
|
||||||
|
#include "storage/fd.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "storage/shmem.h"
|
||||||
|
#include "storage/lwlock.h"
|
||||||
|
#include "storage/procnumber.h"
|
||||||
|
#include "utils/wait_event.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* number of completions processed at once */
|
||||||
|
#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
|
||||||
|
|
||||||
|
|
||||||
|
/* Entry points for IoMethodOps. */
|
||||||
|
static size_t pgaio_uring_shmem_size(void);
|
||||||
|
static void pgaio_uring_shmem_init(bool first_time);
|
||||||
|
static void pgaio_uring_init_backend(void);
|
||||||
|
static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
|
||||||
|
static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
|
||||||
|
|
||||||
|
/* helper functions */
|
||||||
|
static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe);
|
||||||
|
|
||||||
|
|
||||||
|
const IoMethodOps pgaio_uring_ops = {
|
||||||
|
/*
|
||||||
|
* While io_uring mostly is OK with FDs getting closed while the IO is in
|
||||||
|
* flight, that is not true for IOs submitted with IOSQE_ASYNC.
|
||||||
|
*
|
||||||
|
* See
|
||||||
|
* https://postgr.es/m/5ons2rtmwarqqhhexb3dnqulw5rjgwgoct57vpdau4rujlrffj%403fls6d2mkiwc
|
||||||
|
*/
|
||||||
|
.wait_on_fd_before_close = true,
|
||||||
|
|
||||||
|
.shmem_size = pgaio_uring_shmem_size,
|
||||||
|
.shmem_init = pgaio_uring_shmem_init,
|
||||||
|
.init_backend = pgaio_uring_init_backend,
|
||||||
|
|
||||||
|
.submit = pgaio_uring_submit,
|
||||||
|
.wait_one = pgaio_uring_wait_one,
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Per-backend state when using io_method=io_uring
|
||||||
|
*
|
||||||
|
* Align the whole struct to a cacheline boundary, to prevent false sharing
|
||||||
|
* between completion_lock and prior backend's io_uring_ring.
|
||||||
|
*/
|
||||||
|
typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE)
|
||||||
|
PgAioUringContext
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Multiple backends can process completions for this backend's io_uring
|
||||||
|
* instance (e.g. when the backend issuing IO is busy doing something
|
||||||
|
* else). To make that safe we have to ensure that only a single backend
|
||||||
|
* gets io completions from the io_uring instance at a time.
|
||||||
|
*/
|
||||||
|
LWLock completion_lock;
|
||||||
|
|
||||||
|
struct io_uring io_uring_ring;
|
||||||
|
} PgAioUringContext;
|
||||||
|
|
||||||
|
/* PgAioUringContexts for all backends */
|
||||||
|
static PgAioUringContext *pgaio_uring_contexts;
|
||||||
|
|
||||||
|
/* the current backend's context */
|
||||||
|
static PgAioUringContext *pgaio_my_uring_context;
|
||||||
|
|
||||||
|
|
||||||
|
static uint32
|
||||||
|
pgaio_uring_procs(void)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We can subtract MAX_IO_WORKERS here as io workers are never used at the
|
||||||
|
* same time as io_method=io_uring.
|
||||||
|
*/
|
||||||
|
return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static Size
|
||||||
|
pgaio_uring_context_shmem_size(void)
|
||||||
|
{
|
||||||
|
return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext));
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t
|
||||||
|
pgaio_uring_shmem_size(void)
|
||||||
|
{
|
||||||
|
return pgaio_uring_context_shmem_size();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pgaio_uring_shmem_init(bool first_time)
|
||||||
|
{
|
||||||
|
int TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
|
||||||
|
bool found;
|
||||||
|
|
||||||
|
pgaio_uring_contexts = (PgAioUringContext *)
|
||||||
|
ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
|
||||||
|
|
||||||
|
if (found)
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (int contextno = 0; contextno < TotalProcs; contextno++)
|
||||||
|
{
|
||||||
|
PgAioUringContext *context = &pgaio_uring_contexts[contextno];
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Right now a high TotalProcs will cause problems in two ways:
|
||||||
|
*
|
||||||
|
* - RLIMIT_NOFILE needs to be big enough to allow all
|
||||||
|
* io_uring_queue_init() calls to succeed.
|
||||||
|
*
|
||||||
|
* - RLIMIT_NOFILE needs to be big enough to still have enough file
|
||||||
|
* descriptors to satisfy set_max_safe_fds() left over. Or, even
|
||||||
|
* better, have max_files_per_process left over FDs.
|
||||||
|
*
|
||||||
|
* We probably should adjust the soft RLIMIT_NOFILE to ensure that.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* XXX: Newer versions of io_uring support sharing the workers that
|
||||||
|
* execute some asynchronous IOs between io_uring instances. It might
|
||||||
|
* be worth using that - also need to evaluate if that causes
|
||||||
|
* noticeable additional contention?
|
||||||
|
*/
|
||||||
|
ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
char *hint = NULL;
|
||||||
|
int err = ERRCODE_INTERNAL_ERROR;
|
||||||
|
|
||||||
|
/* add hints for some failures that errno explains sufficiently */
|
||||||
|
if (-ret == EPERM)
|
||||||
|
{
|
||||||
|
err = ERRCODE_INSUFFICIENT_PRIVILEGE;
|
||||||
|
hint = _("Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled.");
|
||||||
|
}
|
||||||
|
else if (-ret == EMFILE)
|
||||||
|
{
|
||||||
|
err = ERRCODE_INSUFFICIENT_RESOURCES;
|
||||||
|
hint = psprintf(_("Consider increasing \"ulimit -n\" to at least %d."),
|
||||||
|
TotalProcs + max_files_per_process);
|
||||||
|
}
|
||||||
|
else if (-ret == ENOSYS)
|
||||||
|
{
|
||||||
|
err = ERRCODE_FEATURE_NOT_SUPPORTED;
|
||||||
|
hint = _("Kernel does not support io_uring.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* update errno to allow %m to work */
|
||||||
|
errno = -ret;
|
||||||
|
|
||||||
|
ereport(ERROR,
|
||||||
|
errcode(err),
|
||||||
|
errmsg("could not setup io_uring queue: %m"),
|
||||||
|
hint != NULL ? errhint("%s", hint) : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pgaio_uring_init_backend(void)
|
||||||
|
{
|
||||||
|
Assert(MyProcNumber < pgaio_uring_procs());
|
||||||
|
|
||||||
|
pgaio_my_uring_context = &pgaio_uring_contexts[MyProcNumber];
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
|
||||||
|
{
|
||||||
|
struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
|
||||||
|
int in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios);
|
||||||
|
|
||||||
|
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
|
||||||
|
|
||||||
|
for (int i = 0; i < num_staged_ios; i++)
|
||||||
|
{
|
||||||
|
PgAioHandle *ioh = staged_ios[i];
|
||||||
|
struct io_uring_sqe *sqe;
|
||||||
|
|
||||||
|
sqe = io_uring_get_sqe(uring_instance);
|
||||||
|
|
||||||
|
if (!sqe)
|
||||||
|
elog(ERROR, "io_uring submission queue is unexpectedly full");
|
||||||
|
|
||||||
|
pgaio_io_prepare_submit(ioh);
|
||||||
|
pgaio_uring_sq_from_io(ioh, sqe);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* io_uring executes IO in process context if possible. That's
|
||||||
|
* generally good, as it reduces context switching. When performing a
|
||||||
|
* lot of buffered IO that means that copying between page cache and
|
||||||
|
* userspace memory happens in the foreground, as it can't be
|
||||||
|
* offloaded to DMA hardware as is possible when using direct IO. When
|
||||||
|
* executing a lot of buffered IO this causes io_uring to be slower
|
||||||
|
* than worker mode, as worker mode parallelizes the copying. io_uring
|
||||||
|
* can be told to offload work to worker threads instead.
|
||||||
|
*
|
||||||
|
* If an IO is buffered IO and we already have IOs in flight or
|
||||||
|
* multiple IOs are being submitted, we thus tell io_uring to execute
|
||||||
|
* the IO in the background. We don't do so for the first few IOs
|
||||||
|
* being submitted as executing in this process' context has lower
|
||||||
|
* latency.
|
||||||
|
*/
|
||||||
|
if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED))
|
||||||
|
io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
|
||||||
|
|
||||||
|
in_flight_before++;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_SUBMIT);
|
||||||
|
ret = io_uring_submit(uring_instance);
|
||||||
|
pgstat_report_wait_end();
|
||||||
|
|
||||||
|
if (ret == -EINTR)
|
||||||
|
{
|
||||||
|
pgaio_debug(DEBUG3,
|
||||||
|
"aio method uring: submit EINTR, nios: %d",
|
||||||
|
num_staged_ios);
|
||||||
|
}
|
||||||
|
else if (ret < 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The io_uring_enter() manpage suggests that the appropriate
|
||||||
|
* reaction to EAGAIN is:
|
||||||
|
*
|
||||||
|
* "The application should wait for some completions and try
|
||||||
|
* again"
|
||||||
|
*
|
||||||
|
* However, it seems unlikely that that would help in our case, as
|
||||||
|
* we apply a low limit to the number of outstanding IOs and thus
|
||||||
|
* also outstanding completions, making it unlikely that we'd get
|
||||||
|
* EAGAIN while the OS is in good working order.
|
||||||
|
*
|
||||||
|
* Additionally, it would be problematic to just wait here, our
|
||||||
|
* caller might hold critical locks. It'd possibly lead to
|
||||||
|
* delaying the crash-restart that seems likely to occur when the
|
||||||
|
* kernel is under such heavy memory pressure.
|
||||||
|
*
|
||||||
|
* Update errno to allow %m to work.
|
||||||
|
*/
|
||||||
|
errno = -ret;
|
||||||
|
elog(PANIC, "io_uring submit failed: %m");
|
||||||
|
}
|
||||||
|
else if (ret != num_staged_ios)
|
||||||
|
{
|
||||||
|
/* likely unreachable, but if it is, we would need to re-submit */
|
||||||
|
elog(PANIC, "io_uring submit submitted only %d of %d",
|
||||||
|
ret, num_staged_ios);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pgaio_debug(DEBUG4,
|
||||||
|
"aio method uring: submitted %d IOs",
|
||||||
|
num_staged_ios);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return num_staged_ios;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pgaio_uring_drain_locked(PgAioUringContext *context)
|
||||||
|
{
|
||||||
|
int ready;
|
||||||
|
int orig_ready;
|
||||||
|
|
||||||
|
Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Don't drain more events than available right now. Otherwise it's
|
||||||
|
* plausible that one backend could get stuck, for a while, receiving CQEs
|
||||||
|
* without actually processing them.
|
||||||
|
*/
|
||||||
|
orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
|
||||||
|
|
||||||
|
while (ready > 0)
|
||||||
|
{
|
||||||
|
struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
|
||||||
|
uint32 ncqes;
|
||||||
|
|
||||||
|
START_CRIT_SECTION();
|
||||||
|
ncqes =
|
||||||
|
io_uring_peek_batch_cqe(&context->io_uring_ring,
|
||||||
|
cqes,
|
||||||
|
Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
|
||||||
|
Assert(ncqes <= ready);
|
||||||
|
|
||||||
|
ready -= ncqes;
|
||||||
|
|
||||||
|
for (int i = 0; i < ncqes; i++)
|
||||||
|
{
|
||||||
|
struct io_uring_cqe *cqe = cqes[i];
|
||||||
|
PgAioHandle *ioh;
|
||||||
|
|
||||||
|
ioh = io_uring_cqe_get_data(cqe);
|
||||||
|
io_uring_cqe_seen(&context->io_uring_ring, cqe);
|
||||||
|
|
||||||
|
pgaio_io_process_completion(ioh, cqe->res);
|
||||||
|
}
|
||||||
|
|
||||||
|
END_CRIT_SECTION();
|
||||||
|
|
||||||
|
pgaio_debug(DEBUG3,
|
||||||
|
"drained %d/%d, now expecting %d",
|
||||||
|
ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
|
||||||
|
{
|
||||||
|
PgAioHandleState state;
|
||||||
|
ProcNumber owner_procno = ioh->owner_procno;
|
||||||
|
PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
|
||||||
|
bool expect_cqe;
|
||||||
|
int waited = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: It would be nice to have a smarter locking scheme, nearly all the
|
||||||
|
* time the backend owning the ring will consume the completions, making
|
||||||
|
* the locking unnecessarily expensive.
|
||||||
|
*/
|
||||||
|
LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
pgaio_debug_io(DEBUG3, ioh,
|
||||||
|
"wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
|
||||||
|
(long long unsigned) ioh->generation,
|
||||||
|
(long long unsigned) ref_generation,
|
||||||
|
waited);
|
||||||
|
|
||||||
|
if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
|
||||||
|
state != PGAIO_HS_SUBMITTED)
|
||||||
|
{
|
||||||
|
/* the IO was completed by another backend */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (io_uring_cq_ready(&owner_context->io_uring_ring))
|
||||||
|
{
|
||||||
|
/* no need to wait in the kernel, io_uring has a completion */
|
||||||
|
expect_cqe = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
struct io_uring_cqe *cqes;
|
||||||
|
|
||||||
|
/* need to wait in the kernel */
|
||||||
|
pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_EXECUTION);
|
||||||
|
ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
|
||||||
|
pgstat_report_wait_end();
|
||||||
|
|
||||||
|
if (ret == -EINTR)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (ret != 0)
|
||||||
|
{
|
||||||
|
/* see comment after io_uring_submit() */
|
||||||
|
errno = -ret;
|
||||||
|
elog(PANIC, "io_uring wait failed: %m");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Assert(cqes != NULL);
|
||||||
|
expect_cqe = true;
|
||||||
|
waited++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (expect_cqe)
|
||||||
|
{
|
||||||
|
pgaio_uring_drain_locked(owner_context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(&owner_context->completion_lock);
|
||||||
|
|
||||||
|
pgaio_debug(DEBUG3,
|
||||||
|
"wait_one with %d sleeps",
|
||||||
|
waited);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
|
||||||
|
{
|
||||||
|
struct iovec *iov;
|
||||||
|
|
||||||
|
switch (ioh->op)
|
||||||
|
{
|
||||||
|
case PGAIO_OP_READV:
|
||||||
|
iov = &pgaio_ctl->iovecs[ioh->iovec_off];
|
||||||
|
if (ioh->op_data.read.iov_length == 1)
|
||||||
|
{
|
||||||
|
io_uring_prep_read(sqe,
|
||||||
|
ioh->op_data.read.fd,
|
||||||
|
iov->iov_base,
|
||||||
|
iov->iov_len,
|
||||||
|
ioh->op_data.read.offset);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
io_uring_prep_readv(sqe,
|
||||||
|
ioh->op_data.read.fd,
|
||||||
|
iov,
|
||||||
|
ioh->op_data.read.iov_length,
|
||||||
|
ioh->op_data.read.offset);
|
||||||
|
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case PGAIO_OP_WRITEV:
|
||||||
|
iov = &pgaio_ctl->iovecs[ioh->iovec_off];
|
||||||
|
if (ioh->op_data.write.iov_length == 1)
|
||||||
|
{
|
||||||
|
io_uring_prep_write(sqe,
|
||||||
|
ioh->op_data.write.fd,
|
||||||
|
iov->iov_base,
|
||||||
|
iov->iov_len,
|
||||||
|
ioh->op_data.write.offset);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
io_uring_prep_writev(sqe,
|
||||||
|
ioh->op_data.write.fd,
|
||||||
|
iov,
|
||||||
|
ioh->op_data.write.iov_length,
|
||||||
|
ioh->op_data.write.offset);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case PGAIO_OP_INVALID:
|
||||||
|
elog(ERROR, "trying to prepare invalid IO operation for execution");
|
||||||
|
}
|
||||||
|
|
||||||
|
io_uring_sqe_set_data(sqe, ioh);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* IOMETHOD_IO_URING_ENABLED */
|
@ -177,6 +177,7 @@ static const char *const BuiltinTrancheNames[] = {
|
|||||||
[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
|
[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
|
||||||
[LWTRANCHE_XACT_SLRU] = "XactSLRU",
|
[LWTRANCHE_XACT_SLRU] = "XactSLRU",
|
||||||
[LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
|
[LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
|
||||||
|
[LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion",
|
||||||
};
|
};
|
||||||
|
|
||||||
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
|
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
|
||||||
|
@ -192,7 +192,9 @@ ABI_compatibility:
|
|||||||
|
|
||||||
Section: ClassName - WaitEventIO
|
Section: ClassName - WaitEventIO
|
||||||
|
|
||||||
AIO_IO_COMPLETION "Waiting for IO completion."
|
AIO_IO_COMPLETION "Waiting for another process to complete IO."
|
||||||
|
AIO_IO_URING_SUBMIT "Waiting for IO submission via io_uring."
|
||||||
|
AIO_IO_URING_EXECUTION "Waiting for IO execution via io_uring."
|
||||||
BASEBACKUP_READ "Waiting for base backup to read from a file."
|
BASEBACKUP_READ "Waiting for base backup to read from a file."
|
||||||
BASEBACKUP_SYNC "Waiting for data written by a base backup to reach durable storage."
|
BASEBACKUP_SYNC "Waiting for data written by a base backup to reach durable storage."
|
||||||
BASEBACKUP_WRITE "Waiting for base backup to write to a file."
|
BASEBACKUP_WRITE "Waiting for base backup to write to a file."
|
||||||
|
@ -204,7 +204,8 @@
|
|||||||
# (change requires restart)
|
# (change requires restart)
|
||||||
#io_combine_limit = 128kB # usually 1-128 blocks (depends on OS)
|
#io_combine_limit = 128kB # usually 1-128 blocks (depends on OS)
|
||||||
|
|
||||||
#io_method = worker # worker, sync (change requires restart)
|
#io_method = worker # worker, io_uring, sync
|
||||||
|
# (change requires restart)
|
||||||
#io_max_concurrency = -1 # Max number of IOs that one process
|
#io_max_concurrency = -1 # Max number of IOs that one process
|
||||||
# can execute simultaneously
|
# can execute simultaneously
|
||||||
# -1 sets based on shared_buffers
|
# -1 sets based on shared_buffers
|
||||||
|
@ -22,12 +22,20 @@
|
|||||||
#include "storage/procnumber.h"
|
#include "storage/procnumber.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* io_uring is incompatible with EXEC_BACKEND */
|
||||||
|
#if defined(USE_LIBURING) && !defined(EXEC_BACKEND)
|
||||||
|
#define IOMETHOD_IO_URING_ENABLED
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/* Enum for io_method GUC. */
|
/* Enum for io_method GUC. */
|
||||||
typedef enum IoMethod
|
typedef enum IoMethod
|
||||||
{
|
{
|
||||||
IOMETHOD_SYNC = 0,
|
IOMETHOD_SYNC = 0,
|
||||||
IOMETHOD_WORKER,
|
IOMETHOD_WORKER,
|
||||||
|
#ifdef IOMETHOD_IO_URING_ENABLED
|
||||||
|
IOMETHOD_IO_URING,
|
||||||
|
#endif
|
||||||
} IoMethod;
|
} IoMethod;
|
||||||
|
|
||||||
/* We'll default to worker based execution. */
|
/* We'll default to worker based execution. */
|
||||||
|
@ -248,6 +248,15 @@ typedef struct PgAioCtl
|
|||||||
*/
|
*/
|
||||||
typedef struct IoMethodOps
|
typedef struct IoMethodOps
|
||||||
{
|
{
|
||||||
|
/* properties */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If an FD is about to be closed, do we need to wait for all in-flight
|
||||||
|
* IOs referencing that FD?
|
||||||
|
*/
|
||||||
|
bool wait_on_fd_before_close;
|
||||||
|
|
||||||
|
|
||||||
/* global initialization */
|
/* global initialization */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -314,6 +323,7 @@ extern PgAioResult pgaio_io_call_complete_local(PgAioHandle *ioh);
|
|||||||
/* aio_io.c */
|
/* aio_io.c */
|
||||||
extern void pgaio_io_perform_synchronously(PgAioHandle *ioh);
|
extern void pgaio_io_perform_synchronously(PgAioHandle *ioh);
|
||||||
extern const char *pgaio_io_get_op_name(PgAioHandle *ioh);
|
extern const char *pgaio_io_get_op_name(PgAioHandle *ioh);
|
||||||
|
extern bool pgaio_io_uses_fd(PgAioHandle *ioh, int fd);
|
||||||
|
|
||||||
/* aio_target.c */
|
/* aio_target.c */
|
||||||
extern bool pgaio_io_can_reopen(PgAioHandle *ioh);
|
extern bool pgaio_io_can_reopen(PgAioHandle *ioh);
|
||||||
@ -386,6 +396,9 @@ extern PgAioHandle *pgaio_inj_io_get(void);
|
|||||||
/* Declarations for the tables of function pointers exposed by each IO method. */
|
/* Declarations for the tables of function pointers exposed by each IO method. */
|
||||||
extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops;
|
extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops;
|
||||||
extern PGDLLIMPORT const IoMethodOps pgaio_worker_ops;
|
extern PGDLLIMPORT const IoMethodOps pgaio_worker_ops;
|
||||||
|
#ifdef IOMETHOD_IO_URING_ENABLED
|
||||||
|
extern PGDLLIMPORT const IoMethodOps pgaio_uring_ops;
|
||||||
|
#endif
|
||||||
|
|
||||||
extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops;
|
extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops;
|
||||||
extern PGDLLIMPORT PgAioCtl *pgaio_ctl;
|
extern PGDLLIMPORT PgAioCtl *pgaio_ctl;
|
||||||
|
@ -218,6 +218,7 @@ typedef enum BuiltinTrancheIds
|
|||||||
LWTRANCHE_SUBTRANS_SLRU,
|
LWTRANCHE_SUBTRANS_SLRU,
|
||||||
LWTRANCHE_XACT_SLRU,
|
LWTRANCHE_XACT_SLRU,
|
||||||
LWTRANCHE_PARALLEL_VACUUM_DSA,
|
LWTRANCHE_PARALLEL_VACUUM_DSA,
|
||||||
|
LWTRANCHE_AIO_URING_COMPLETION,
|
||||||
LWTRANCHE_FIRST_USER_DEFINED,
|
LWTRANCHE_FIRST_USER_DEFINED,
|
||||||
} BuiltinTrancheIds;
|
} BuiltinTrancheIds;
|
||||||
|
|
||||||
|
@ -2151,6 +2151,7 @@ PgAioReturn
|
|||||||
PgAioTargetData
|
PgAioTargetData
|
||||||
PgAioTargetID
|
PgAioTargetID
|
||||||
PgAioTargetInfo
|
PgAioTargetInfo
|
||||||
|
PgAioUringContext
|
||||||
PgAioWaitRef
|
PgAioWaitRef
|
||||||
PgArchData
|
PgArchData
|
||||||
PgBackendGSSStatus
|
PgBackendGSSStatus
|
||||||
|
Loading…
x
Reference in New Issue
Block a user