mirror of
https://github.com/postgres/postgres.git
synced 2025-07-05 07:21:24 +03:00
Track logrep apply workers' last start times to avoid useless waits.
Enforce wal_retrieve_retry_interval on a per-subscription basis, rather than globally, and arrange to skip that delay in case of an intentional worker exit. This probably makes little difference in the field, where apply workers wouldn't be restarted often; but it has a significant impact on the runtime of our logical replication regression tests (even though those tests use artificially-small wal_retrieve_retry_interval settings already). Nathan Bossart, with mostly-cosmetic editorialization by me Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13
This commit is contained in:
@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
|
|||||||
environments where the number of times an infrastructure is accessed
|
environments where the number of times an infrastructure is accessed
|
||||||
is taken into account.
|
is taken into account.
|
||||||
</para>
|
</para>
|
||||||
|
<para>
|
||||||
|
In logical replication, this parameter also limits how often a failing
|
||||||
|
replication apply worker will be respawned.
|
||||||
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
@ -2008,6 +2008,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
|
|||||||
<entry>Waiting to read or update information
|
<entry>Waiting to read or update information
|
||||||
about <quote>heavyweight</quote> locks.</entry>
|
about <quote>heavyweight</quote> locks.</entry>
|
||||||
</row>
|
</row>
|
||||||
|
<row>
|
||||||
|
<entry><literal>LogicalRepLauncherDSA</literal></entry>
|
||||||
|
<entry>Waiting to access logical replication launcher's dynamic shared
|
||||||
|
memory allocator.</entry>
|
||||||
|
</row>
|
||||||
|
<row>
|
||||||
|
<entry><literal>LogicalRepLauncherHash</literal></entry>
|
||||||
|
<entry>Waiting to access logical replication launcher's shared
|
||||||
|
hash table.</entry>
|
||||||
|
</row>
|
||||||
<row>
|
<row>
|
||||||
<entry><literal>LogicalRepWorker</literal></entry>
|
<entry><literal>LogicalRepWorker</literal></entry>
|
||||||
<entry>Waiting to read or update the state of logical replication
|
<entry>Waiting to read or update the state of logical replication
|
||||||
|
@ -1504,6 +1504,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
|||||||
}
|
}
|
||||||
list_free(subworkers);
|
list_free(subworkers);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remove the no-longer-useful entry in the launcher's table of apply
|
||||||
|
* worker start times.
|
||||||
|
*
|
||||||
|
* If this transaction rolls back, the launcher might restart a failed
|
||||||
|
* apply worker before wal_retrieve_retry_interval milliseconds have
|
||||||
|
* elapsed, but that's pretty harmless.
|
||||||
|
*/
|
||||||
|
ApplyLauncherForgetWorkerStartTime(subid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Cleanup of tablesync replication origins.
|
* Cleanup of tablesync replication origins.
|
||||||
*
|
*
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#include "catalog/pg_subscription.h"
|
#include "catalog/pg_subscription.h"
|
||||||
#include "catalog/pg_subscription_rel.h"
|
#include "catalog/pg_subscription_rel.h"
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
#include "lib/dshash.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
|
|||||||
/* Supervisor process. */
|
/* Supervisor process. */
|
||||||
pid_t launcher_pid;
|
pid_t launcher_pid;
|
||||||
|
|
||||||
|
/* Hash table holding last start times of subscriptions' apply workers. */
|
||||||
|
dsa_handle last_start_dsa;
|
||||||
|
dshash_table_handle last_start_dsh;
|
||||||
|
|
||||||
/* Background workers. */
|
/* Background workers. */
|
||||||
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
|
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} LogicalRepCtxStruct;
|
} LogicalRepCtxStruct;
|
||||||
|
|
||||||
static LogicalRepCtxStruct *LogicalRepCtx;
|
static LogicalRepCtxStruct *LogicalRepCtx;
|
||||||
|
|
||||||
|
/* an entry in the last-start-times shared hash table */
|
||||||
|
typedef struct LauncherLastStartTimesEntry
|
||||||
|
{
|
||||||
|
Oid subid; /* OID of logrep subscription (hash key) */
|
||||||
|
TimestampTz last_start_time; /* last time its apply worker was started */
|
||||||
|
} LauncherLastStartTimesEntry;
|
||||||
|
|
||||||
|
/* parameters for the last-start-times shared hash table */
|
||||||
|
static const dshash_parameters dsh_params = {
|
||||||
|
sizeof(Oid),
|
||||||
|
sizeof(LauncherLastStartTimesEntry),
|
||||||
|
dshash_memcmp,
|
||||||
|
dshash_memhash,
|
||||||
|
LWTRANCHE_LAUNCHER_HASH
|
||||||
|
};
|
||||||
|
|
||||||
|
static dsa_area *last_start_times_dsa = NULL;
|
||||||
|
static dshash_table *last_start_times = NULL;
|
||||||
|
|
||||||
|
static bool on_commit_launcher_wakeup = false;
|
||||||
|
|
||||||
|
|
||||||
static void ApplyLauncherWakeup(void);
|
static void ApplyLauncherWakeup(void);
|
||||||
static void logicalrep_launcher_onexit(int code, Datum arg);
|
static void logicalrep_launcher_onexit(int code, Datum arg);
|
||||||
static void logicalrep_worker_onexit(int code, Datum arg);
|
static void logicalrep_worker_onexit(int code, Datum arg);
|
||||||
static void logicalrep_worker_detach(void);
|
static void logicalrep_worker_detach(void);
|
||||||
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
|
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
|
||||||
static int logicalrep_pa_worker_count(Oid subid);
|
static int logicalrep_pa_worker_count(Oid subid);
|
||||||
|
static void logicalrep_launcher_attach_dshmem(void);
|
||||||
static bool on_commit_launcher_wakeup = false;
|
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
|
||||||
|
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
|
|||||||
|
|
||||||
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
|
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
|
||||||
|
|
||||||
|
LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
|
||||||
|
LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
|
||||||
|
|
||||||
/* Initialize memory and spin locks for each worker slot. */
|
/* Initialize memory and spin locks for each worker slot. */
|
||||||
for (slot = 0; slot < max_logical_replication_workers; slot++)
|
for (slot = 0; slot < max_logical_replication_workers; slot++)
|
||||||
{
|
{
|
||||||
@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize or attach to the dynamic shared hash table that stores the
|
||||||
|
* last-start times, if not already done.
|
||||||
|
* This must be called before accessing the table.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
logicalrep_launcher_attach_dshmem(void)
|
||||||
|
{
|
||||||
|
MemoryContext oldcontext;
|
||||||
|
|
||||||
|
/* Quick exit if we already did this. */
|
||||||
|
if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID &&
|
||||||
|
last_start_times != NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* Otherwise, use a lock to ensure only one process creates the table. */
|
||||||
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
/* Be sure any local memory allocated by DSA routines is persistent. */
|
||||||
|
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
|
||||||
|
|
||||||
|
if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
|
||||||
|
{
|
||||||
|
/* Initialize dynamic shared hash table for last-start times. */
|
||||||
|
last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
|
||||||
|
dsa_pin(last_start_times_dsa);
|
||||||
|
dsa_pin_mapping(last_start_times_dsa);
|
||||||
|
last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
|
||||||
|
|
||||||
|
/* Store handles in shared memory for other backends to use. */
|
||||||
|
LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
|
||||||
|
LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
|
||||||
|
}
|
||||||
|
else if (!last_start_times)
|
||||||
|
{
|
||||||
|
/* Attach to existing dynamic shared hash table. */
|
||||||
|
last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
|
||||||
|
dsa_pin_mapping(last_start_times_dsa);
|
||||||
|
last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
|
||||||
|
LogicalRepCtx->last_start_dsh, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
LWLockRelease(LogicalRepWorkerLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set the last-start time for the subscription.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
|
||||||
|
{
|
||||||
|
LauncherLastStartTimesEntry *entry;
|
||||||
|
bool found;
|
||||||
|
|
||||||
|
logicalrep_launcher_attach_dshmem();
|
||||||
|
|
||||||
|
entry = dshash_find_or_insert(last_start_times, &subid, &found);
|
||||||
|
entry->last_start_time = start_time;
|
||||||
|
dshash_release_lock(last_start_times, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return the last-start time for the subscription, or 0 if there isn't one.
|
||||||
|
*/
|
||||||
|
static TimestampTz
|
||||||
|
ApplyLauncherGetWorkerStartTime(Oid subid)
|
||||||
|
{
|
||||||
|
LauncherLastStartTimesEntry *entry;
|
||||||
|
TimestampTz ret;
|
||||||
|
|
||||||
|
logicalrep_launcher_attach_dshmem();
|
||||||
|
|
||||||
|
entry = dshash_find(last_start_times, &subid, false);
|
||||||
|
if (entry == NULL)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
ret = entry->last_start_time;
|
||||||
|
dshash_release_lock(last_start_times, entry);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remove the last-start-time entry for the subscription, if one exists.
|
||||||
|
*
|
||||||
|
* This has two use-cases: to remove the entry related to a subscription
|
||||||
|
* that's been deleted or disabled (just to avoid leaking shared memory),
|
||||||
|
* and to allow immediate restart of an apply worker that has exited
|
||||||
|
* due to subscription parameter changes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ApplyLauncherForgetWorkerStartTime(Oid subid)
|
||||||
|
{
|
||||||
|
logicalrep_launcher_attach_dshmem();
|
||||||
|
|
||||||
|
(void) dshash_delete_key(last_start_times, &subid);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Wakeup the launcher on commit if requested.
|
* Wakeup the launcher on commit if requested.
|
||||||
*/
|
*/
|
||||||
@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
|
|||||||
void
|
void
|
||||||
ApplyLauncherMain(Datum main_arg)
|
ApplyLauncherMain(Datum main_arg)
|
||||||
{
|
{
|
||||||
TimestampTz last_start_time = 0;
|
|
||||||
|
|
||||||
ereport(DEBUG1,
|
ereport(DEBUG1,
|
||||||
(errmsg_internal("logical replication launcher started")));
|
(errmsg_internal("logical replication launcher started")));
|
||||||
|
|
||||||
@ -976,31 +1104,25 @@ ApplyLauncherMain(Datum main_arg)
|
|||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
MemoryContext subctx;
|
MemoryContext subctx;
|
||||||
MemoryContext oldctx;
|
MemoryContext oldctx;
|
||||||
TimestampTz now;
|
|
||||||
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
|
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
now = GetCurrentTimestamp();
|
/* Use temporary context to avoid leaking memory across cycles. */
|
||||||
|
|
||||||
/* Limit the start retry to once a wal_retrieve_retry_interval */
|
|
||||||
if (TimestampDifferenceExceeds(last_start_time, now,
|
|
||||||
wal_retrieve_retry_interval))
|
|
||||||
{
|
|
||||||
/* Use temporary context for the database list and worker info. */
|
|
||||||
subctx = AllocSetContextCreate(TopMemoryContext,
|
subctx = AllocSetContextCreate(TopMemoryContext,
|
||||||
"Logical Replication Launcher sublist",
|
"Logical Replication Launcher sublist",
|
||||||
ALLOCSET_DEFAULT_SIZES);
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
oldctx = MemoryContextSwitchTo(subctx);
|
oldctx = MemoryContextSwitchTo(subctx);
|
||||||
|
|
||||||
/* search for subscriptions to start or stop. */
|
/* Start any missing workers for enabled subscriptions. */
|
||||||
sublist = get_subscription_list();
|
sublist = get_subscription_list();
|
||||||
|
|
||||||
/* Start the missing workers for enabled subscriptions. */
|
|
||||||
foreach(lc, sublist)
|
foreach(lc, sublist)
|
||||||
{
|
{
|
||||||
Subscription *sub = (Subscription *) lfirst(lc);
|
Subscription *sub = (Subscription *) lfirst(lc);
|
||||||
LogicalRepWorker *w;
|
LogicalRepWorker *w;
|
||||||
|
TimestampTz last_start;
|
||||||
|
TimestampTz now;
|
||||||
|
long elapsed;
|
||||||
|
|
||||||
if (!sub->enabled)
|
if (!sub->enabled)
|
||||||
continue;
|
continue;
|
||||||
@ -1009,13 +1131,36 @@ ApplyLauncherMain(Datum main_arg)
|
|||||||
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
|
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
|
||||||
LWLockRelease(LogicalRepWorkerLock);
|
LWLockRelease(LogicalRepWorkerLock);
|
||||||
|
|
||||||
if (w == NULL)
|
if (w != NULL)
|
||||||
{
|
continue; /* worker is running already */
|
||||||
last_start_time = now;
|
|
||||||
wait_time = wal_retrieve_retry_interval;
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the worker is eligible to start now, launch it. Otherwise,
|
||||||
|
* adjust wait_time so that we'll wake up as soon as it can be
|
||||||
|
* started.
|
||||||
|
*
|
||||||
|
* Each subscription's apply worker can only be restarted once per
|
||||||
|
* wal_retrieve_retry_interval, so that errors do not cause us to
|
||||||
|
* repeatedly restart the worker as fast as possible. In cases
|
||||||
|
* where a restart is expected (e.g., subscription parameter
|
||||||
|
* changes), another process should remove the last-start entry
|
||||||
|
* for the subscription so that the worker can be restarted
|
||||||
|
* without waiting for wal_retrieve_retry_interval to elapse.
|
||||||
|
*/
|
||||||
|
last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
|
if (last_start == 0 ||
|
||||||
|
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
|
||||||
|
{
|
||||||
|
ApplyLauncherSetWorkerStartTime(sub->oid, now);
|
||||||
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
|
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
|
||||||
sub->owner, InvalidOid, DSM_HANDLE_INVALID);
|
sub->owner, InvalidOid,
|
||||||
|
DSM_HANDLE_INVALID);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
wait_time = Min(wait_time,
|
||||||
|
wal_retrieve_retry_interval - elapsed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1023,17 +1168,6 @@ ApplyLauncherMain(Datum main_arg)
|
|||||||
MemoryContextSwitchTo(oldctx);
|
MemoryContextSwitchTo(oldctx);
|
||||||
/* Clean the temporary memory. */
|
/* Clean the temporary memory. */
|
||||||
MemoryContextDelete(subctx);
|
MemoryContextDelete(subctx);
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* The wait in previous cycle was interrupted in less than
|
|
||||||
* wal_retrieve_retry_interval since last worker was started, this
|
|
||||||
* usually means crash of the worker, so we should retry in
|
|
||||||
* wal_retrieve_retry_interval again.
|
|
||||||
*/
|
|
||||||
wait_time = wal_retrieve_retry_interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Wait for more work. */
|
/* Wait for more work. */
|
||||||
rc = WaitLatch(MyLatch,
|
rc = WaitLatch(MyLatch,
|
||||||
|
@ -628,8 +628,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (should_exit)
|
if (should_exit)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Reset the last-start time for this worker so that the launcher will
|
||||||
|
* restart it without waiting for wal_retrieve_retry_interval.
|
||||||
|
*/
|
||||||
|
ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
|
||||||
|
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Process possible state change(s) of tables that are being synchronized.
|
* Process possible state change(s) of tables that are being synchronized.
|
||||||
|
@ -174,6 +174,7 @@
|
|||||||
#include "postmaster/walwriter.h"
|
#include "postmaster/walwriter.h"
|
||||||
#include "replication/decode.h"
|
#include "replication/decode.h"
|
||||||
#include "replication/logical.h"
|
#include "replication/logical.h"
|
||||||
|
#include "replication/logicallauncher.h"
|
||||||
#include "replication/logicalproto.h"
|
#include "replication/logicalproto.h"
|
||||||
#include "replication/logicalrelation.h"
|
#include "replication/logicalrelation.h"
|
||||||
#include "replication/logicalworker.h"
|
#include "replication/logicalworker.h"
|
||||||
@ -3811,6 +3812,15 @@ apply_worker_exit(void)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Reset the last-start time for this apply worker so that the launcher
|
||||||
|
* will restart it without waiting for wal_retrieve_retry_interval if the
|
||||||
|
* subscription is still active, and so that we won't leak that hash table
|
||||||
|
* entry if it isn't.
|
||||||
|
*/
|
||||||
|
if (!am_tablesync_worker())
|
||||||
|
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
|
||||||
|
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3851,6 +3861,9 @@ maybe_reread_subscription(void)
|
|||||||
(errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
|
(errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
|
||||||
get_worker_name(), MySubscription->name)));
|
get_worker_name(), MySubscription->name)));
|
||||||
|
|
||||||
|
/* Ensure we remove no-longer-useful entry for worker's start time */
|
||||||
|
if (!am_tablesync_worker() && !am_parallel_apply_worker())
|
||||||
|
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4421,6 +4434,9 @@ InitializeApplyWorker(void)
|
|||||||
(errmsg("%s for subscription %u will not start because the subscription was removed during startup",
|
(errmsg("%s for subscription %u will not start because the subscription was removed during startup",
|
||||||
get_worker_name(), MyLogicalRepWorker->subid)));
|
get_worker_name(), MyLogicalRepWorker->subid)));
|
||||||
|
|
||||||
|
/* Ensure we remove no-longer-useful entry for worker's start time */
|
||||||
|
if (!am_tablesync_worker() && !am_parallel_apply_worker())
|
||||||
|
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void)
|
|||||||
DisableSubscription(MySubscription->oid);
|
DisableSubscription(MySubscription->oid);
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
/* Ensure we remove no-longer-useful entry for worker's start time */
|
||||||
|
if (!am_tablesync_worker() && !am_parallel_apply_worker())
|
||||||
|
ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
|
||||||
|
|
||||||
/* Notify the subscription has been disabled and exit */
|
/* Notify the subscription has been disabled and exit */
|
||||||
ereport(LOG,
|
ereport(LOG,
|
||||||
errmsg("subscription \"%s\" has been disabled because of an error",
|
errmsg("subscription \"%s\" has been disabled because of an error",
|
||||||
|
@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
|
|||||||
"PgStatsHash",
|
"PgStatsHash",
|
||||||
/* LWTRANCHE_PGSTATS_DATA: */
|
/* LWTRANCHE_PGSTATS_DATA: */
|
||||||
"PgStatsData",
|
"PgStatsData",
|
||||||
|
/* LWTRANCHE_LAUNCHER_DSA: */
|
||||||
|
"LogicalRepLauncherDSA",
|
||||||
|
/* LWTRANCHE_LAUNCHER_HASH: */
|
||||||
|
"LogicalRepLauncherHash",
|
||||||
};
|
};
|
||||||
|
|
||||||
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
|
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
|
||||||
|
@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg);
|
|||||||
extern Size ApplyLauncherShmemSize(void);
|
extern Size ApplyLauncherShmemSize(void);
|
||||||
extern void ApplyLauncherShmemInit(void);
|
extern void ApplyLauncherShmemInit(void);
|
||||||
|
|
||||||
|
extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
|
||||||
|
|
||||||
extern void ApplyLauncherWakeupAtCommit(void);
|
extern void ApplyLauncherWakeupAtCommit(void);
|
||||||
extern void AtEOXact_ApplyLauncher(bool isCommit);
|
extern void AtEOXact_ApplyLauncher(bool isCommit);
|
||||||
|
|
||||||
|
@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
|
|||||||
LWTRANCHE_PGSTATS_DSA,
|
LWTRANCHE_PGSTATS_DSA,
|
||||||
LWTRANCHE_PGSTATS_HASH,
|
LWTRANCHE_PGSTATS_HASH,
|
||||||
LWTRANCHE_PGSTATS_DATA,
|
LWTRANCHE_PGSTATS_DATA,
|
||||||
|
LWTRANCHE_LAUNCHER_DSA,
|
||||||
|
LWTRANCHE_LAUNCHER_HASH,
|
||||||
LWTRANCHE_FIRST_USER_DEFINED
|
LWTRANCHE_FIRST_USER_DEFINED
|
||||||
} BuiltinTrancheIds;
|
} BuiltinTrancheIds;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user