diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index dc9b78b0b7d..f985afc009d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4877,6 +4877,10 @@ ANY num_sync ( last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; + /* Initialize memory and spin locks for each worker slot. */ for (slot = 0; slot < max_logical_replication_workers; slot++) { @@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void) } } +/* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. + * This must be called before accessing the table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this. */ + if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID && + last_start_times != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Set the last-start time for the subscription. + */ +static void +ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +ApplyLauncherGetWorkerStartTime(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start-time entry for the subscription, if one exists. + * + * This has two use-cases: to remove the entry related to a subscription + * that's been deleted or disabled (just to avoid leaking shared memory), + * and to allow immediate restart of an apply worker that has exited + * due to subscription parameter changes. + */ +void +ApplyLauncherForgetWorkerStartTime(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} + /* * Wakeup the launcher on commit if requested. */ @@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg) ListCell *lc; MemoryContext subctx; MemoryContext oldctx; - TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - now = GetCurrentTimestamp(); + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); + if (!sub->enabled) + continue; - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - if (!sub->enabled) - continue; + if (w != NULL) + continue; /* worker is running already */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; - - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, DSM_HANDLE_INVALID); - } - } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases + * where a restart is expected (e.g., subscription parameter + * changes), another process should remove the last-start entry + * for the subscription so that the worker can be restarted + * without waiting for wal_retrieve_retry_interval to elapse. */ - wait_time = wal_retrieve_retry_interval; + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + wait_time = Min(wait_time, + wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 38dfce71296..4647837b823 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } if (should_exit) + { + /* + * Reset the last-start time for this worker so that the launcher will + * restart it without waiting for wal_retrieve_retry_interval. + */ + ApplyLauncherForgetWorkerStartTime(MySubscription->oid); + proc_exit(0); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a0084c7ef69..cfb2ab62481 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -174,6 +174,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3811,6 +3812,15 @@ apply_worker_exit(void) return; } + /* + * Reset the last-start time for this apply worker so that the launcher + * will restart it without waiting for wal_retrieve_retry_interval if the + * subscription is still active, and so that we won't leak that hash table + * entry if it isn't. + */ + if (!am_tablesync_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + proc_exit(0); } @@ -3851,6 +3861,9 @@ maybe_reread_subscription(void) (errmsg("%s for subscription \"%s\" will stop because the subscription was removed", get_worker_name(), MySubscription->name))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4421,6 +4434,9 @@ InitializeApplyWorker(void) (errmsg("%s for subscription %u will not start because the subscription was removed during startup", get_worker_name(), MyLogicalRepWorker->subid))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void) DisableSubscription(MySubscription->oid); CommitTransactionCommand(); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + /* Notify the subscription has been disabled and exit */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3d..d2ec3960451 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 360e98702a8..a07c9cb311a 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); +extern void ApplyLauncherForgetWorkerStartTime(Oid subid); + extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613c..d2c7afb8f40 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;