1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-05 07:21:24 +03:00

Track logrep apply workers' last start times to avoid useless waits.

Enforce wal_retrieve_retry_interval on a per-subscription basis,
rather than globally, and arrange to skip that delay in case of
an intentional worker exit.  This probably makes little difference
in the field, where apply workers wouldn't be restarted often;
but it has a significant impact on the runtime of our logical
replication regression tests (even though those tests use
artificially-small wal_retrieve_retry_interval settings already).

Nathan Bossart, with mostly-cosmetic editorialization by me

Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13
This commit is contained in:
Tom Lane
2023-01-22 14:08:46 -05:00
parent c9f7f92648
commit 5a3a95385b
9 changed files with 243 additions and 49 deletions

View File

@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
environments where the number of times an infrastructure is accessed
is taken into account.
</para>
<para>
In logical replication, this parameter also limits how often a failing
replication apply worker will be respawned.
</para>
</listitem>
</varlistentry>

View File

@ -2008,6 +2008,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to read or update information
about <quote>heavyweight</quote> locks.</entry>
</row>
<row>
<entry><literal>LogicalRepLauncherDSA</literal></entry>
<entry>Waiting to access logical replication launcher's dynamic shared
memory allocator.</entry>
</row>
<row>
<entry><literal>LogicalRepLauncherHash</literal></entry>
<entry>Waiting to access logical replication launcher's shared
hash table.</entry>
</row>
<row>
<entry><literal>LogicalRepWorker</literal></entry>
<entry>Waiting to read or update the state of logical replication

View File

@ -1504,6 +1504,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
list_free(subworkers);
/*
* Remove the no-longer-useful entry in the launcher's table of apply
* worker start times.
*
* If this transaction rolls back, the launcher might restart a failed
* apply worker before wal_retrieve_retry_interval milliseconds have
* elapsed, but that's pretty harmless.
*/
ApplyLauncherForgetWorkerStartTime(subid);
/*
* Cleanup of tablesync replication origins.
*

View File

@ -25,6 +25,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
/* Supervisor process. */
pid_t launcher_pid;
/* Hash table holding last start times of subscriptions' apply workers. */
dsa_handle last_start_dsa;
dshash_table_handle last_start_dsh;
/* Background workers. */
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
static LogicalRepCtxStruct *LogicalRepCtx;
/* an entry in the last-start-times shared hash table */
typedef struct LauncherLastStartTimesEntry
{
Oid subid; /* OID of logrep subscription (hash key) */
TimestampTz last_start_time; /* last time its apply worker was started */
} LauncherLastStartTimesEntry;
/* parameters for the last-start-times shared hash table */
static const dshash_parameters dsh_params = {
sizeof(Oid),
sizeof(LauncherLastStartTimesEntry),
dshash_memcmp,
dshash_memhash,
LWTRANCHE_LAUNCHER_HASH
};
static dsa_area *last_start_times_dsa = NULL;
static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
static int logicalrep_pa_worker_count(Oid subid);
static bool on_commit_launcher_wakeup = false;
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
/*
@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
/* Initialize memory and spin locks for each worker slot. */
for (slot = 0; slot < max_logical_replication_workers; slot++)
{
@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void)
}
}
/*
* Initialize or attach to the dynamic shared hash table that stores the
* last-start times, if not already done.
* This must be called before accessing the table.
*/
static void
logicalrep_launcher_attach_dshmem(void)
{
MemoryContext oldcontext;
/* Quick exit if we already did this. */
if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID &&
last_start_times != NULL)
return;
/* Otherwise, use a lock to ensure only one process creates the table. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
/* Be sure any local memory allocated by DSA routines is persistent. */
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
{
/* Initialize dynamic shared hash table for last-start times. */
last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
dsa_pin(last_start_times_dsa);
dsa_pin_mapping(last_start_times_dsa);
last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
/* Store handles in shared memory for other backends to use. */
LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
}
else if (!last_start_times)
{
/* Attach to existing dynamic shared hash table. */
last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
dsa_pin_mapping(last_start_times_dsa);
last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
LogicalRepCtx->last_start_dsh, 0);
}
MemoryContextSwitchTo(oldcontext);
LWLockRelease(LogicalRepWorkerLock);
}
/*
* Set the last-start time for the subscription.
*/
static void
ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
{
LauncherLastStartTimesEntry *entry;
bool found;
logicalrep_launcher_attach_dshmem();
entry = dshash_find_or_insert(last_start_times, &subid, &found);
entry->last_start_time = start_time;
dshash_release_lock(last_start_times, entry);
}
/*
* Return the last-start time for the subscription, or 0 if there isn't one.
*/
static TimestampTz
ApplyLauncherGetWorkerStartTime(Oid subid)
{
LauncherLastStartTimesEntry *entry;
TimestampTz ret;
logicalrep_launcher_attach_dshmem();
entry = dshash_find(last_start_times, &subid, false);
if (entry == NULL)
return 0;
ret = entry->last_start_time;
dshash_release_lock(last_start_times, entry);
return ret;
}
/*
* Remove the last-start-time entry for the subscription, if one exists.
*
* This has two use-cases: to remove the entry related to a subscription
* that's been deleted or disabled (just to avoid leaking shared memory),
* and to allow immediate restart of an apply worker that has exited
* due to subscription parameter changes.
*/
void
ApplyLauncherForgetWorkerStartTime(Oid subid)
{
logicalrep_launcher_attach_dshmem();
(void) dshash_delete_key(last_start_times, &subid);
}
/*
* Wakeup the launcher on commit if requested.
*/
@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
void
ApplyLauncherMain(Datum main_arg)
{
TimestampTz last_start_time = 0;
ereport(DEBUG1,
(errmsg_internal("logical replication launcher started")));
@ -976,31 +1104,25 @@ ApplyLauncherMain(Datum main_arg)
ListCell *lc;
MemoryContext subctx;
MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
CHECK_FOR_INTERRUPTS();
now = GetCurrentTimestamp();
/* Limit the start retry to once a wal_retrieve_retry_interval */
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
/* Use temporary context for the database list and worker info. */
/* Use temporary context to avoid leaking memory across cycles. */
subctx = AllocSetContextCreate(TopMemoryContext,
"Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
/* search for subscriptions to start or stop. */
/* Start any missing workers for enabled subscriptions. */
sublist = get_subscription_list();
/* Start the missing workers for enabled subscriptions. */
foreach(lc, sublist)
{
Subscription *sub = (Subscription *) lfirst(lc);
LogicalRepWorker *w;
TimestampTz last_start;
TimestampTz now;
long elapsed;
if (!sub->enabled)
continue;
@ -1009,13 +1131,36 @@ ApplyLauncherMain(Datum main_arg)
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (w == NULL)
{
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
if (w != NULL)
continue; /* worker is running already */
/*
* If the worker is eligible to start now, launch it. Otherwise,
* adjust wait_time so that we'll wake up as soon as it can be
* started.
*
* Each subscription's apply worker can only be restarted once per
* wal_retrieve_retry_interval, so that errors do not cause us to
* repeatedly restart the worker as fast as possible. In cases
* where a restart is expected (e.g., subscription parameter
* changes), another process should remove the last-start entry
* for the subscription so that the worker can be restarted
* without waiting for wal_retrieve_retry_interval to elapse.
*/
last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
now = GetCurrentTimestamp();
if (last_start == 0 ||
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid, DSM_HANDLE_INVALID);
sub->owner, InvalidOid,
DSM_HANDLE_INVALID);
}
else
{
wait_time = Min(wait_time,
wal_retrieve_retry_interval - elapsed);
}
}
@ -1023,17 +1168,6 @@ ApplyLauncherMain(Datum main_arg)
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
MemoryContextDelete(subctx);
}
else
{
/*
* The wait in previous cycle was interrupted in less than
* wal_retrieve_retry_interval since last worker was started, this
* usually means crash of the worker, so we should retry in
* wal_retrieve_retry_interval again.
*/
wait_time = wal_retrieve_retry_interval;
}
/* Wait for more work. */
rc = WaitLatch(MyLatch,

View File

@ -628,8 +628,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
if (should_exit)
{
/*
* Reset the last-start time for this worker so that the launcher will
* restart it without waiting for wal_retrieve_retry_interval.
*/
ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
proc_exit(0);
}
}
/*
* Process possible state change(s) of tables that are being synchronized.

View File

@ -174,6 +174,7 @@
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
@ -3811,6 +3812,15 @@ apply_worker_exit(void)
return;
}
/*
* Reset the last-start time for this apply worker so that the launcher
* will restart it without waiting for wal_retrieve_retry_interval if the
* subscription is still active, and so that we won't leak that hash table
* entry if it isn't.
*/
if (!am_tablesync_worker())
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
proc_exit(0);
}
@ -3851,6 +3861,9 @@ maybe_reread_subscription(void)
(errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
get_worker_name(), MySubscription->name)));
/* Ensure we remove no-longer-useful entry for worker's start time */
if (!am_tablesync_worker() && !am_parallel_apply_worker())
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
proc_exit(0);
}
@ -4421,6 +4434,9 @@ InitializeApplyWorker(void)
(errmsg("%s for subscription %u will not start because the subscription was removed during startup",
get_worker_name(), MyLogicalRepWorker->subid)));
/* Ensure we remove no-longer-useful entry for worker's start time */
if (!am_tablesync_worker() && !am_parallel_apply_worker())
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
proc_exit(0);
}
@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void)
DisableSubscription(MySubscription->oid);
CommitTransactionCommand();
/* Ensure we remove no-longer-useful entry for worker's start time */
if (!am_tablesync_worker() && !am_parallel_apply_worker())
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
/* Notify the subscription has been disabled and exit */
ereport(LOG,
errmsg("subscription \"%s\" has been disabled because of an error",

View File

@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
"PgStatsHash",
/* LWTRANCHE_PGSTATS_DATA: */
"PgStatsData",
/* LWTRANCHE_LAUNCHER_DSA: */
"LogicalRepLauncherDSA",
/* LWTRANCHE_LAUNCHER_HASH: */
"LogicalRepLauncherHash",
};
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==

View File

@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg);
extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
extern void ApplyLauncherWakeupAtCommit(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);

View File

@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_PGSTATS_DSA,
LWTRANCHE_PGSTATS_HASH,
LWTRANCHE_PGSTATS_DATA,
LWTRANCHE_LAUNCHER_DSA,
LWTRANCHE_LAUNCHER_HASH,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;