1
0
mirror of https://github.com/postgres/postgres.git synced 2026-01-27 21:43:08 +03:00

Allow bgworkers to be terminated for database-related commands

Background workers gain a new flag, called BGWORKER_INTERRUPTIBLE, that
offers the possibility to terminate the workers when these are connected
to a database that is involved in one of the following commands:
ALTER DATABASE RENAME TO
ALTER DATABASE SET TABLESPACE
CREATE DATABASE
DROP DATABASE

This is useful to give background workers the same behavior as backends
and autovacuum workers, which are stopped when these commands are
executed.  The default behavior, that exists since 9.3, is still to
never terminate bgworkers connected to the database involved in any of
these commands.  The new flag has to be set to terminate the workers.

A couple of tests are added to worker_spi to track the commands that
impact the termination of the workers.  There is a test case for a
non-interruptible worker, additionally, that relies on an injection
point to make the wait time in CountOtherDBBackends() reduced from 5s to
0.3s for faster test runs.  The tests rely on the contents of the server
logs to check if a worker has been started or terminated:
- LOG generated by worker_spi_main() at startup, once connection to
database is done.
- FATAL in bgworker_die() when terminated.
A couple of tests run in the CI have showed that this method is stable
enough.  The safe_psql() calls that scan pg_stat_activity could be
replaced with some poll_query_until() for more stability, if the current
method proves to be an issue in the buildfarm.

Author: Aya Iwata <iwata.aya@fujitsu.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Ryo Matsumura <matsumura.ryo@fujitsu.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Discussion: https://postgr.es/m/OS7PR01MB11964335F36BE41021B62EAE8EAE4A@OS7PR01MB11964.jpnprd01.prod.outlook.com
This commit is contained in:
Michael Paquier
2026-01-06 14:24:29 +09:00
parent c970bdc037
commit f1e251be80
9 changed files with 274 additions and 6 deletions

View File

@@ -108,6 +108,29 @@ typedef struct BackgroundWorker
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>BGWORKER_INTERRUPTIBLE</literal></term>
<listitem>
<para>
<indexterm><primary>BGWORKER_INTERRUPTIBLE</primary></indexterm>
Requests termination of the background worker when its connected database is
dropped, renamed, moved to a different tablespace, or used as a template for
<command>CREATE DATABASE</command>. Specifically, the postmaster sends a
termination signal when any of these commands affect the worker's database:
<itemizedlist>
<listitem><para><command>DROP DATABASE</command></para></listitem>
<listitem><para><command>ALTER DATABASE RENAME
TO</command></para></listitem>
<listitem><para><command>ALTER DATABASE SET
TABLESPACE</command></para></listitem>
<listitem><para><command>CREATE DATABASE</command></para></listitem>
</itemizedlist>
Requires both <literal>BGWORKER_SHMEM_ACCESS</literal> and
<literal>BGWORKER_BACKEND_DATABASE_CONNECTION</literal>.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</para> </para>

View File

@@ -26,6 +26,7 @@
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
@@ -665,6 +666,17 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
/* XXX other checks? */ /* XXX other checks? */
} }
/* Interruptible workers require a database connection */
if ((worker->bgw_flags & BGWORKER_INTERRUPTIBLE) &&
!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
{
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("background worker \"%s\": cannot make background workers interruptible without database access",
worker->bgw_name)));
return false;
}
if ((worker->bgw_restart_time < 0 && if ((worker->bgw_restart_time < 0 &&
worker->bgw_restart_time != BGW_NEVER_RESTART) || worker->bgw_restart_time != BGW_NEVER_RESTART) ||
(worker->bgw_restart_time > USECS_PER_DAY / 1000)) (worker->bgw_restart_time > USECS_PER_DAY / 1000))
@@ -1399,3 +1411,42 @@ GetBackgroundWorkerTypeByPid(pid_t pid)
return result; return result;
} }
/*
* Terminate all background workers connected to the given database, if the
* workers can be interrupted.
*/
void
TerminateBackgroundWorkersForDatabase(Oid databaseId)
{
bool signal_postmaster = false;
LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
/*
* Iterate through slots, looking for workers connected to the given
* database.
*/
for (int slotno = 0; slotno < BackgroundWorkerData->total_slots; slotno++)
{
BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
if (slot->in_use &&
(slot->worker.bgw_flags & BGWORKER_INTERRUPTIBLE))
{
PGPROC *proc = BackendPidGetProc(slot->pid);
if (proc && proc->databaseId == databaseId)
{
slot->terminate = true;
signal_postmaster = true;
}
}
}
LWLockRelease(BackgroundWorkerLock);
/* Make sure the postmaster notices the change to shared memory. */
if (signal_postmaster)
SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
}

View File

@@ -56,11 +56,13 @@
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "postmaster/bgworker.h"
#include "port/pg_lfind.h" #include "port/pg_lfind.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
@@ -3687,8 +3689,10 @@ CountUserBackends(Oid roleid)
* CountOtherDBBackends -- check for other backends running in the given DB * CountOtherDBBackends -- check for other backends running in the given DB
* *
* If there are other backends in the DB, we will wait a maximum of 5 seconds * If there are other backends in the DB, we will wait a maximum of 5 seconds
* for them to exit. Autovacuum backends are encouraged to exit early by * for them to exit (or 0.3s for testing purposes). Autovacuum backends are
* sending them SIGTERM, but normal user backends are just waited for. * encouraged to exit early by sending them SIGTERM, but normal user backends
* are just waited for. If background workers connected to this database are
* marked as interruptible, they are terminated.
* *
* The current backend is always ignored; it is caller's responsibility to * The current backend is always ignored; it is caller's responsibility to
* check whether the current backend uses the given DB, if it's important. * check whether the current backend uses the given DB, if it's important.
@@ -3713,10 +3717,19 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
#define MAXAUTOVACPIDS 10 /* max autovacs to SIGTERM per iteration */ #define MAXAUTOVACPIDS 10 /* max autovacs to SIGTERM per iteration */
int autovac_pids[MAXAUTOVACPIDS]; int autovac_pids[MAXAUTOVACPIDS];
int tries;
/* 50 tries with 100ms sleep between tries makes 5 sec total wait */ /*
for (tries = 0; tries < 50; tries++) * Retry up to 50 times with 100ms between attempts (max 5s total). Can be
* reduced to 3 attempts (max 0.3s total) to speed up tests.
*/
int ntries = 50;
#ifdef USE_INJECTION_POINTS
if (IS_INJECTION_POINT_ATTACHED("procarray-reduce-count"))
ntries = 3;
#endif
for (int tries = 0; tries < ntries; tries++)
{ {
int nautovacs = 0; int nautovacs = 0;
bool found = false; bool found = false;
@@ -3766,6 +3779,12 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
for (index = 0; index < nautovacs; index++) for (index = 0; index < nautovacs; index++)
(void) kill(autovac_pids[index], SIGTERM); /* ignore any error */ (void) kill(autovac_pids[index], SIGTERM); /* ignore any error */
/*
* Terminate all background workers for this database, if they have
* requested it (BGWORKER_INTERRUPTIBLE).
*/
TerminateBackgroundWorkersForDatabase(databaseId);
/* sleep, then try again */ /* sleep, then try again */
pg_usleep(100 * 1000L); /* 100ms */ pg_usleep(100 * 1000L); /* 100ms */
} }

View File

@@ -59,6 +59,13 @@
*/ */
#define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002 #define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002
/*
* Exit the bgworker if its database is involved in a CREATE, ALTER or DROP
* database command. It requires BGWORKER_SHMEM_ACCESS and
* BGWORKER_BACKEND_DATABASE_CONNECTION.
*/
#define BGWORKER_INTERRUPTIBLE 0x0004
/* /*
* This class is used internally for parallel queries, to keep track of the * This class is used internally for parallel queries, to keep track of the
* number of active parallel workers and make sure we never launch more than * number of active parallel workers and make sure we never launch more than
@@ -129,6 +136,9 @@ extern const char *GetBackgroundWorkerTypeByPid(pid_t pid);
/* Terminate a bgworker */ /* Terminate a bgworker */
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle); extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);
/* Terminate background workers connected to database */
extern void TerminateBackgroundWorkersForDatabase(Oid databaseId);
/* This is valid in a running worker */ /* This is valid in a running worker */
extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry; extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry;

View File

@@ -6,6 +6,10 @@ EXTENSION = worker_spi
DATA = worker_spi--1.0.sql DATA = worker_spi--1.0.sql
PGFILEDESC = "worker_spi - background worker example" PGFILEDESC = "worker_spi - background worker example"
EXTRA_INSTALL = src/test/modules/injection_points
export enable_injection_points
TAP_TESTS = 1 TAP_TESTS = 1
ifdef USE_PGXS ifdef USE_PGXS

View File

@@ -26,8 +26,12 @@ tests += {
'sd': meson.current_source_dir(), 'sd': meson.current_source_dir(),
'bd': meson.current_build_dir(), 'bd': meson.current_build_dir(),
'tap': { 'tap': {
'env': {
'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
},
'tests': [ 'tests': [
't/001_worker_spi.pl', 't/001_worker_spi.pl',
't/002_worker_terminate.pl'
], ],
}, },
} }

View File

@@ -0,0 +1,151 @@
# Copyright (c) 2026, PostgreSQL Global Development Group
# Test background workers can be terminated by db commands
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# This test depends on injection points to detect whether background workers
# remain.
if ($ENV{enable_injection_points} ne 'yes')
{
plan skip_all => 'Injection points not supported by this build';
}
# Ensure the worker_spi dynamic worker is launched on the specified database.
# Returns the PID of the worker launched.
sub launch_bgworker
{
my ($node, $database, $testcase, $interruptible) = @_;
my $offset = -s $node->logfile;
# Launch a background worker on the given database.
my $pid = $node->safe_psql(
$database, qq(
SELECT worker_spi_launch($testcase, '$database'::regdatabase, 0, '{}', $interruptible);
));
# Check that the bgworker is initialized.
$node->wait_for_log(
qr/LOG: worker_spi dynamic worker $testcase initialized with .*\..*/,
$offset);
my $result = $node->safe_psql($database,
"SELECT count(*) > 0 FROM pg_stat_activity WHERE pid = $pid;");
is($result, 't', "dynamic bgworker $testcase launched");
return $pid;
}
# Run query and verify that the bgworker with the specified PID has been
# terminated.
sub run_bgworker_interruptible_test
{
my ($node, $command, $testname, $pid) = @_;
my $offset = -s $node->logfile;
$node->safe_psql('postgres', $command);
$node->wait_for_log(
qr/terminating background worker \"worker_spi dynamic\" due to administrator command/,
$offset);
my $result = $node->safe_psql('postgres',
"SELECT count(*) = 0 FROM pg_stat_activity WHERE pid = $pid;");
is($result, 't', "dynamic bgworker stopped for $testname");
}
my $node = PostgreSQL::Test::Cluster->new('mynode');
$node->init;
$node->start;
# Check if the extension injection_points is available, as it may be
# possible that this script is run with installcheck, where the module
# would not be installed by default.
if (!$node->check_extension('injection_points'))
{
plan skip_all => 'Extension injection_points not installed';
}
$node->safe_psql('postgres', 'CREATE EXTENSION worker_spi;');
# Launch a background worker without BGWORKER_INTERRUPTIBLE.
my $pid = launch_bgworker($node, 'postgres', 0, 'false');
# Ensure CREATE DATABASE WITH TEMPLATE fails because a non-interruptible
# bgworker exists.
# The injection point 'procarray-reduce-count' reduces the number of backend
# retries, allowing for shorter test runs. See CountOtherDBBackends().
$node->safe_psql('postgres', "CREATE EXTENSION injection_points;");
$node->safe_psql('postgres',
"SELECT injection_points_attach('procarray-reduce-count', 'error');");
my $stderr;
$node->psql(
'postgres',
"CREATE DATABASE testdb WITH TEMPLATE postgres",
stderr => \$stderr);
ok( $stderr =~
"source database \"postgres\" is being accessed by other users",
"background worker blocked the database creation");
# Confirm that the non-interruptible bgworker is still running.
my $result = $node->safe_psql(
"postgres", qq(
SELECT count(1) FROM pg_stat_activity
WHERE backend_type = 'worker_spi dynamic';));
is($result, '1',
"background worker is still running after CREATE DATABASE WITH TEMPLATE");
# Terminate the non-interruptible worker for the next tests.
$node->safe_psql(
"postgres", qq(
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity WHERE backend_type = 'worker_spi dynamic';));
# The injection point is not used anymore, release it.
$node->safe_psql('postgres',
"SELECT injection_points_detach('procarray-reduce-count');");
# Check that BGWORKER_INTERRUPTIBLE allows background workers to be
# terminated with database-related commands.
# Test case 1: CREATE DATABASE WITH TEMPLATE
$pid = launch_bgworker($node, 'postgres', 1, 'true');
run_bgworker_interruptible_test(
$node,
"CREATE DATABASE testdb WITH TEMPLATE postgres",
"CREATE DATABASE WITH TEMPLATE", $pid);
# Test case 2: ALTER DATABASE RENAME
$pid = launch_bgworker($node, 'testdb', 2, 'true');
run_bgworker_interruptible_test(
$node,
"ALTER DATABASE testdb RENAME TO renameddb",
"ALTER DATABASE RENAME", $pid);
# Preparation for the next test, create a tablespace.
my $tablespace = PostgreSQL::Test::Utils::tempdir;
$node->safe_psql('postgres',
"CREATE TABLESPACE test_tablespace LOCATION '$tablespace'");
# Test case 3: ALTER DATABASE SET TABLESPACE
$pid = launch_bgworker($node, 'renameddb', 3, 'true');
run_bgworker_interruptible_test(
$node,
"ALTER DATABASE renameddb SET TABLESPACE test_tablespace",
"ALTER DATABASE SET TABLESPACE", $pid);
# Test case 4: DROP DATABASE
$pid = launch_bgworker($node, 'renameddb', 4, 'true');
run_bgworker_interruptible_test(
$node,
"DROP DATABASE renameddb",
"DROP DATABASE", $pid);
done_testing();

View File

@@ -7,7 +7,8 @@
CREATE FUNCTION worker_spi_launch(index int4, CREATE FUNCTION worker_spi_launch(index int4,
dboid oid DEFAULT 0, dboid oid DEFAULT 0,
roleoid oid DEFAULT 0, roleoid oid DEFAULT 0,
flags text[] DEFAULT '{}') flags text[] DEFAULT '{}',
interruptible boolean DEFAULT false)
RETURNS pg_catalog.int4 STRICT RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C; LANGUAGE C;

View File

@@ -404,10 +404,15 @@ worker_spi_launch(PG_FUNCTION_ARGS)
Size ndim; Size ndim;
int nelems; int nelems;
Datum *datum_flags; Datum *datum_flags;
bool interruptible = PG_GETARG_BOOL(4);
memset(&worker, 0, sizeof(worker)); memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION; BGWORKER_BACKEND_DATABASE_CONNECTION;
if (interruptible)
worker.bgw_flags |= BGWORKER_INTERRUPTIBLE;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "worker_spi"); sprintf(worker.bgw_library_name, "worker_spi");