1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

aio: Add errcontext for processing I/Os for another backend

Push an ErrorContextCallback adding additional detail about the process
performing the I/O and the owner of the I/O when those are not the same.

For io_method worker, this adds context specifying which process owns
the I/O that the I/O worker is processing.

For io_method io_uring, this adds context only when a backend is
*completing* I/O for another backend. It specifies the pid of the owning
process.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/rdml3fpukrqnas7qc5uimtl2fyytrnu6ymc2vjf2zuflbsjuul%40hyizyjsexwmm
This commit is contained in:
Melanie Plageman
2025-04-01 19:53:07 -04:00
parent b136db07c6
commit b3219c69fc
2 changed files with 60 additions and 0 deletions

View File

@ -302,14 +302,41 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
return num_staged_ios; return num_staged_ios;
} }
static void
pgaio_uring_completion_error_callback(void *arg)
{
ProcNumber owner;
PGPROC *owner_proc;
int32 owner_pid;
PgAioHandle *ioh = arg;
if (!ioh)
return;
/* No need for context if a backend is completing the IO for itself */
if (ioh->owner_procno == MyProcNumber)
return;
owner = ioh->owner_procno;
owner_proc = GetPGProcByNumber(owner);
owner_pid = owner_proc->pid;
errcontext("completing I/O on behalf of process %d", owner_pid);
}
static void static void
pgaio_uring_drain_locked(PgAioUringContext *context) pgaio_uring_drain_locked(PgAioUringContext *context)
{ {
int ready; int ready;
int orig_ready; int orig_ready;
ErrorContextCallback errcallback = {0};
Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE)); Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
errcallback.callback = pgaio_uring_completion_error_callback;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* /*
* Don't drain more events than available right now. Otherwise it's * Don't drain more events than available right now. Otherwise it's
* plausible that one backend could get stuck, for a while, receiving CQEs * plausible that one backend could get stuck, for a while, receiving CQEs
@ -337,9 +364,11 @@ pgaio_uring_drain_locked(PgAioUringContext *context)
PgAioHandle *ioh; PgAioHandle *ioh;
ioh = io_uring_cqe_get_data(cqe); ioh = io_uring_cqe_get_data(cqe);
errcallback.arg = ioh;
io_uring_cqe_seen(&context->io_uring_ring, cqe); io_uring_cqe_seen(&context->io_uring_ring, cqe);
pgaio_io_process_completion(ioh, cqe->res); pgaio_io_process_completion(ioh, cqe->res);
errcallback.arg = NULL;
} }
END_CRIT_SECTION(); END_CRIT_SECTION();
@ -348,6 +377,8 @@ pgaio_uring_drain_locked(PgAioUringContext *context)
"drained %d/%d, now expecting %d", "drained %d/%d, now expecting %d",
ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring)); ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
} }
error_context_stack = errcallback.previous;
} }
static void static void

View File

@ -357,11 +357,33 @@ pgaio_worker_register(void)
on_shmem_exit(pgaio_worker_die, 0); on_shmem_exit(pgaio_worker_die, 0);
} }
static void
pgaio_worker_error_callback(void *arg)
{
ProcNumber owner;
PGPROC *owner_proc;
int32 owner_pid;
PgAioHandle *ioh = arg;
if (!ioh)
return;
Assert(ioh->owner_procno != MyProcNumber);
Assert(MyBackendType == B_IO_WORKER);
owner = ioh->owner_procno;
owner_proc = GetPGProcByNumber(owner);
owner_pid = owner_proc->pid;
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
void void
IoWorkerMain(const void *startup_data, size_t startup_data_len) IoWorkerMain(const void *startup_data, size_t startup_data_len)
{ {
sigjmp_buf local_sigjmp_buf; sigjmp_buf local_sigjmp_buf;
PgAioHandle *volatile error_ioh = NULL; PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0; volatile int error_errno = 0;
char cmd[128]; char cmd[128];
@ -388,6 +410,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
sprintf(cmd, "%d", MyIoWorkerId); sprintf(cmd, "%d", MyIoWorkerId);
set_ps_display(cmd); set_ps_display(cmd);
errcallback.callback = pgaio_worker_error_callback;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* see PostgresMain() */ /* see PostgresMain() */
if (sigsetjmp(local_sigjmp_buf, 1) != 0) if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{ {
@ -471,6 +497,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
ioh = &pgaio_ctl->io_handles[io_index]; ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh; error_ioh = ioh;
errcallback.arg = ioh;
pgaio_debug_io(DEBUG4, ioh, pgaio_debug_io(DEBUG4, ioh,
"worker %d processing IO", "worker %d processing IO",
@ -511,6 +538,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
pgaio_io_perform_synchronously(ioh); pgaio_io_perform_synchronously(ioh);
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
errcallback.arg = NULL;
} }
else else
{ {
@ -522,6 +550,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
} }
error_context_stack = errcallback.previous;
proc_exit(0); proc_exit(0);
} }