1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-19 13:42:17 +03:00

aio: Infrastructure for io_method=worker

This commit contains the basic, system-wide, infrastructure for
io_method=worker. It does not yet actually execute IO, this commit just
provides the infrastructure for running IO workers, kept separate for easier
review.

The number of IO workers can be adjusted with a PGC_SIGHUP GUC. Eventually
we'd like to make the number of workers dynamically scale up/down based on the
current "IO load".

To allow the number of IO workers to be increased without a restart, we need
to reserve PGPROC entries for the workers unconditionally. This has been
judged to be worth the cost. If it turns out to be problematic, we can
introduce a PGC_POSTMASTER GUC to control the maximum number.

As io workers might be needed during shutdown, e.g. for AIO during the
shutdown checkpoint, a new PMState phase is added. IO workers are shut down
after the shutdown checkpoint has been performed and walsender/archiver have
shut down, but before the checkpointer itself shuts down. See also
87a6690cc6.

Updates PGSTAT_FILE_FORMAT_ID due to the addition of a new BackendType.

Reviewed-by: Noah Misch <noah@leadboat.com>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Co-authored-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
Andres Freund
2025-03-18 10:52:33 -04:00
parent 549ea06e42
commit 55b454d0e1
20 changed files with 342 additions and 15 deletions

View File

@@ -48,6 +48,7 @@
#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "storage/dsm.h"
#include "storage/io_worker.h"
#include "storage/pg_shmem.h"
#include "tcop/backend_startup.h"
#include "utils/memutils.h"
@@ -197,6 +198,7 @@ static child_process_kind child_process_kinds[] = {
[B_ARCHIVER] = {"archiver", PgArchiverMain, true},
[B_BG_WRITER] = {"bgwriter", BackgroundWriterMain, true},
[B_CHECKPOINTER] = {"checkpointer", CheckpointerMain, true},
[B_IO_WORKER] = {"io_worker", IoWorkerMain, true},
[B_STARTUP] = {"startup", StartupProcessMain, true},
[B_WAL_RECEIVER] = {"wal_receiver", WalReceiverMain, true},
[B_WAL_SUMMARIZER] = {"wal_summarizer", WalSummarizerMain, true},

View File

@@ -101,6 +101,7 @@ InitPostmasterChildSlots(void)
pmchild_pools[B_AUTOVAC_WORKER].size = autovacuum_worker_slots;
pmchild_pools[B_BG_WORKER].size = max_worker_processes;
pmchild_pools[B_IO_WORKER].size = MAX_IO_WORKERS;
/*
* There can be only one of each of these running at a time. They each

View File

@@ -108,9 +108,12 @@
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/walsender.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/backend_startup.h"
#include "tcop/tcopprot.h"
#include "utils/datetime.h"
@@ -340,6 +343,7 @@ typedef enum
* ckpt */
PM_WAIT_XLOG_ARCHIVAL, /* waiting for archiver and walsenders to
* finish */
PM_WAIT_IO_WORKERS, /* waiting for io workers to exit */
PM_WAIT_CHECKPOINTER, /* waiting for checkpointer to shut down */
PM_WAIT_DEAD_END, /* waiting for dead-end children to exit */
PM_NO_CHILDREN, /* all important children have exited */
@@ -402,6 +406,10 @@ bool LoadedSSL = false;
static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
/*
* postmaster.c - function prototypes
*/
@@ -436,6 +444,8 @@ static void TerminateChildren(int signal);
static int CountChildren(BackendTypeMask targetMask);
static void LaunchMissingBackgroundProcesses(void);
static void maybe_start_bgworkers(void);
static bool maybe_reap_io_worker(int pid);
static void maybe_adjust_io_workers(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static PMChild *StartChildProcess(BackendType type);
static void StartSysLogger(void);
@@ -1365,6 +1375,11 @@ PostmasterMain(int argc, char *argv[])
*/
AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_STARTING);
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
maybe_adjust_io_workers();
/* Start bgwriter and checkpointer so they can help with recovery */
if (CheckpointerPMChild == NULL)
CheckpointerPMChild = StartChildProcess(B_CHECKPOINTER);
@@ -1377,7 +1392,6 @@ PostmasterMain(int argc, char *argv[])
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
StartupStatus = STARTUP_RUNNING;
UpdatePMState(PM_STARTUP);
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
@@ -2502,6 +2516,16 @@ process_pm_child_exit(void)
continue;
}
/* Was it an IO worker? */
if (maybe_reap_io_worker(pid))
{
if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
HandleChildCrash(pid, exitstatus, _("io worker"));
maybe_adjust_io_workers();
continue;
}
/*
* Was it a backend or a background worker?
*/
@@ -2723,6 +2747,7 @@ HandleFatalError(QuitSignalReason reason, bool consider_sigabrt)
case PM_WAIT_XLOG_SHUTDOWN:
case PM_WAIT_XLOG_ARCHIVAL:
case PM_WAIT_CHECKPOINTER:
case PM_WAIT_IO_WORKERS:
/*
* NB: Similar code exists in PostmasterStateMachine()'s handling
@@ -2905,20 +2930,21 @@ PostmasterStateMachine(void)
/*
* If we are doing crash recovery or an immediate shutdown then we
* expect archiver, checkpointer and walsender to exit as well,
* otherwise not.
* expect archiver, checkpointer, io workers and walsender to exit as
* well, otherwise not.
*/
if (FatalError || Shutdown >= ImmediateShutdown)
targetMask = btmask_add(targetMask,
B_CHECKPOINTER,
B_ARCHIVER,
B_IO_WORKER,
B_WAL_SENDER);
/*
* Normally walsenders and archiver will continue running; they will
* be terminated later after writing the checkpoint record. We also
* let dead-end children to keep running for now. The syslogger
* process exits last.
* Normally archiver, checkpointer, IO workers and walsenders will
* continue running; they will be terminated later after writing the
* checkpoint record. We also let dead-end children to keep running
* for now. The syslogger process exits last.
*
* This assertion checks that we have covered all backend types,
* either by including them in targetMask, or by noting here that they
@@ -2933,12 +2959,13 @@ PostmasterStateMachine(void)
B_LOGGER);
/*
* Archiver, checkpointer and walsender may or may not be in
* targetMask already.
* Archiver, checkpointer, IO workers, and walsender may or may
* not be in targetMask already.
*/
remainMask = btmask_add(remainMask,
B_ARCHIVER,
B_CHECKPOINTER,
B_IO_WORKER,
B_WAL_SENDER);
/* these are not real postmaster children */
@@ -3039,11 +3066,25 @@ PostmasterStateMachine(void)
{
/*
* PM_WAIT_XLOG_ARCHIVAL state ends when there are no children other
* than checkpointer, dead-end children and logger left. There
* than checkpointer, io workers and dead-end children left. There
* shouldn't be any regular backends left by now anyway; what we're
* really waiting for is for walsenders and archiver to exit.
*/
if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_LOGGER, B_DEAD_END_BACKEND)) == 0)
if (CountChildren(btmask_all_except(B_CHECKPOINTER, B_IO_WORKER,
B_LOGGER, B_DEAD_END_BACKEND)) == 0)
{
UpdatePMState(PM_WAIT_IO_WORKERS);
SignalChildren(SIGUSR2, btmask(B_IO_WORKER));
}
}
if (pmState == PM_WAIT_IO_WORKERS)
{
/*
* PM_WAIT_IO_WORKERS state ends when there's only checkpointer and
* dead_end children left.
*/
if (io_worker_count == 0)
{
UpdatePMState(PM_WAIT_CHECKPOINTER);
@@ -3171,10 +3212,14 @@ PostmasterStateMachine(void)
/* re-create shared memory and semaphores */
CreateSharedMemoryAndSemaphores();
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
maybe_adjust_io_workers();
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
StartupStatus = STARTUP_RUNNING;
UpdatePMState(PM_STARTUP);
/* crash recovery started, reset SIGKILL flag */
AbortStartTime = 0;
@@ -3198,6 +3243,7 @@ pmstate_name(PMState state)
PM_TOSTR_CASE(PM_WAIT_BACKENDS);
PM_TOSTR_CASE(PM_WAIT_XLOG_SHUTDOWN);
PM_TOSTR_CASE(PM_WAIT_XLOG_ARCHIVAL);
PM_TOSTR_CASE(PM_WAIT_IO_WORKERS);
PM_TOSTR_CASE(PM_WAIT_DEAD_END);
PM_TOSTR_CASE(PM_WAIT_CHECKPOINTER);
PM_TOSTR_CASE(PM_NO_CHILDREN);
@@ -3235,6 +3281,16 @@ LaunchMissingBackgroundProcesses(void)
if (SysLoggerPMChild == NULL && Logging_collector)
StartSysLogger();
/*
* The number of configured workers might have changed, or a prior start
* of a worker might have failed. Check if we need to start/stop any
* workers.
*
* A config file change will always lead to this function being called, so
* we always will process the config change in a timely manner.
*/
maybe_adjust_io_workers();
/*
* The checkpointer and the background writer are active from the start,
* until shutdown is initiated.
@@ -4120,6 +4176,7 @@ bgworker_should_start_now(BgWorkerStartTime start_time)
case PM_WAIT_DEAD_END:
case PM_WAIT_XLOG_ARCHIVAL:
case PM_WAIT_XLOG_SHUTDOWN:
case PM_WAIT_IO_WORKERS:
case PM_WAIT_BACKENDS:
case PM_STOP_BACKENDS:
break;
@@ -4270,6 +4327,99 @@ maybe_start_bgworkers(void)
}
}
static bool
maybe_reap_io_worker(int pid)
{
for (int id = 0; id < MAX_IO_WORKERS; ++id)
{
if (io_worker_children[id] &&
io_worker_children[id]->pid == pid)
{
ReleasePostmasterChildSlot(io_worker_children[id]);
--io_worker_count;
io_worker_children[id] = NULL;
return true;
}
}
return false;
}
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
* the io_workers GUC (by increasing and decreasing the number of workers), as
* well as workers terminating in response to errors (by starting
* "replacement" workers).
*/
static void
maybe_adjust_io_workers(void)
{
if (!pgaio_workers_enabled())
return;
/*
* If we're in final shutting down state, then we're just waiting for all
* processes to exit.
*/
if (pmState >= PM_WAIT_IO_WORKERS)
return;
/* Don't start new workers during an immediate shutdown either. */
if (Shutdown >= ImmediateShutdown)
return;
/*
* Don't start new workers if we're in the shutdown phase of a crash
* restart. But we *do* need to start if we're already starting up again.
*/
if (FatalError && pmState >= PM_STOP_BACKENDS)
return;
Assert(pmState < PM_WAIT_IO_WORKERS);
/* Not enough running? */
while (io_worker_count < io_workers)
{
PMChild *child;
int id;
/* find unused entry in io_worker_children array */
for (id = 0; id < MAX_IO_WORKERS; ++id)
{
if (io_worker_children[id] == NULL)
break;
}
if (id == MAX_IO_WORKERS)
elog(ERROR, "could not find a free IO worker ID");
/* Try to launch one. */
child = StartChildProcess(B_IO_WORKER);
if (child != NULL)
{
io_worker_children[id] = child;
++io_worker_count;
}
else
break; /* XXX try again soon? */
}
/* Too many running? */
if (io_worker_count > io_workers)
{
/* ask the IO worker in the highest slot to exit */
for (int id = MAX_IO_WORKERS - 1; id >= 0; --id)
{
if (io_worker_children[id] != NULL)
{
kill(io_worker_children[id]->pid, SIGUSR2);
break;
}
}
}
}
/*
* When a backend asks to be notified about worker state changes, we
* set a flag in its backend entry. The background worker machinery needs