1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-03 09:13:20 +03:00

Add max_retention_duration option to subscriptions.

This commit introduces a new subscription parameter,
max_retention_duration, aimed at mitigating excessive accumulation of dead
tuples when retain_dead_tuples is enabled and the apply worker lags behind
the publisher.

When the time spent advancing a non-removable transaction ID exceeds the
max_retention_duration threshold, the apply worker will stop retaining
conflict detection information. In such cases, the conflict slot's xmin
will be set to InvalidTransactionId, provided that all apply workers
associated with the subscription (with retain_dead_tuples enabled) confirm
the retention duration has been exceeded.

To ensure retention status persists across server restarts, a new column
subretentionactive has been added to the pg_subscription catalog. This
prevents unnecessary reactivation of retention logic after a restart.

The conflict detection slot will not be automatically re-initialized
unless a new subscription is created with retain_dead_tuples = true, or
the user manually re-enables retain_dead_tuples.

A future patch will introduce support for automatic slot re-initialization
once at least one apply worker confirms that the retention duration is
within the configured max_retention_duration.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila
2025-09-02 03:20:18 +00:00
parent 36aed19fd9
commit a850be2fe6
20 changed files with 777 additions and 216 deletions

View File

@@ -43,6 +43,7 @@
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
@@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
static bool acquire_conflict_slot_if_exists(void);
static void advance_conflict_slot_xmin(TransactionId new_xmin);
static void update_conflict_slot_xmin(TransactionId new_xmin);
static void init_conflict_slot_xmin(void);
/*
@@ -152,6 +154,7 @@ get_subscription_list(void)
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
sub->retaindeadtuples = subform->subretaindeadtuples;
sub->retentionactive = subform->subretentionactive;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
bool can_advance_xmin = true;
bool can_update_xmin = true;
bool retain_dead_tuples = false;
TransactionId xmin = InvalidTransactionId;
@@ -1214,17 +1217,6 @@ ApplyLauncherMain(Datum main_arg)
{
retain_dead_tuples = true;
/*
* Can't advance xmin of the slot unless all the subscriptions
* with retain_dead_tuples are enabled. This is required to
* ensure that we don't advance the xmin of
* CONFLICT_DETECTION_SLOT if one of the subscriptions is not
* enabled. Otherwise, we won't be able to detect conflicts
* reliably for such a subscription even though it has set the
* retain_dead_tuples option.
*/
can_advance_xmin &= sub->enabled;
/*
* Create a replication slot to retain information necessary
* for conflict detection such as dead tuples, commit
@@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg)
* subscription was enabled.
*/
CreateConflictDetectionSlot();
if (sub->retentionactive)
{
/*
* Can't advance xmin of the slot unless all the
* subscriptions actively retaining dead tuples are
* enabled. This is required to ensure that we don't
* advance the xmin of CONFLICT_DETECTION_SLOT if one of
* the subscriptions is not enabled. Otherwise, we won't
* be able to detect conflicts reliably for such a
* subscription even though it has set the
* retain_dead_tuples option.
*/
can_update_xmin &= sub->enabled;
/*
* Initialize the slot once the subscription activiates
* retention.
*/
if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
init_conflict_slot_xmin();
}
}
if (!sub->enabled)
@@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
* workers that enables retain_dead_tuples.
* workers.
*/
if (sub->retaindeadtuples && can_advance_xmin)
if (sub->retaindeadtuples &&
sub->retentionactive &&
can_update_xmin)
compute_min_nonremovable_xid(w, &xmin);
/* worker is running already */
@@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg)
/*
* Can't advance xmin of the slot unless all the workers
* corresponding to subscriptions with retain_dead_tuples are
* running, disabling the further computation of the minimum
* corresponding to subscriptions actively retaining dead tuples
* are running, disabling the further computation of the minimum
* nonremovable xid.
*/
if (sub->retaindeadtuples)
can_advance_xmin = false;
if (sub->retaindeadtuples && sub->retentionactive)
can_update_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg)
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID,
sub->retaindeadtuples))
sub->retaindeadtuples &&
sub->retentionactive))
{
/*
* We get here either if we failed to launch a worker
@@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg)
* that requires us to retain dead tuples. Otherwise, if required,
* advance the slot's xmin to protect dead tuples required for the
* conflict detection.
*
* Additionally, if all apply workers for subscriptions with
* retain_dead_tuples enabled have requested to stop retention, the
* slot's xmin will be set to InvalidTransactionId allowing the
* removal of dead tuples.
*/
if (MyReplicationSlot)
{
if (!retain_dead_tuples)
ReplicationSlotDropAcquired();
else if (can_advance_xmin)
advance_conflict_slot_xmin(xmin);
else if (can_update_xmin)
update_conflict_slot_xmin(xmin);
}
/* Switch back to original memory context. */
@@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
nonremovable_xid = worker->oldest_nonremovable_xid;
SpinLockRelease(&worker->relmutex);
Assert(TransactionIdIsValid(nonremovable_xid));
/*
* Return if the apply worker has stopped retention concurrently.
*
* Although this function is invoked only when retentionactive is true,
* the apply worker might stop retention after the launcher fetches the
* retentionactive flag.
*/
if (!TransactionIdIsValid(nonremovable_xid))
return;
if (!TransactionIdIsValid(*xmin) ||
TransactionIdPrecedes(nonremovable_xid, *xmin))
@@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void)
}
/*
* Advance the xmin the replication slot used to retain information required
* Update the xmin the replication slot used to retain information required
* for conflict detection.
*/
static void
advance_conflict_slot_xmin(TransactionId new_xmin)
update_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
Assert(TransactionIdIsValid(new_xmin));
Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
Assert(!TransactionIdIsValid(new_xmin) ||
TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
/* Return if the xmin value of the slot cannot be advanced */
/* Return if the xmin value of the slot cannot be updated */
if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
return;
@@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
}
/*
* Create and acquire the replication slot used to retain information for
* conflict detection, if not yet.
* Initialize the xmin for the conflict detection slot.
*/
void
CreateConflictDetectionSlot(void)
static void
init_conflict_slot_xmin(void)
{
TransactionId xmin_horizon;
/* Exit early, if the replication slot is already created and acquired */
if (MyReplicationSlot)
return;
ereport(LOG,
errmsg("creating replication conflict detection slot"));
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
false, false);
/* Replication slot must exist but shouldn't be initialized. */
Assert(MyReplicationSlot &&
!TransactionIdIsValid(MyReplicationSlot->data.xmin));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1475,6 +1498,26 @@ CreateConflictDetectionSlot(void)
ReplicationSlotSave();
}
/*
* Create and acquire the replication slot used to retain information for
* conflict detection, if not yet.
*/
void
CreateConflictDetectionSlot(void)
{
/* Exit early, if the replication slot is already created and acquired */
if (MyReplicationSlot)
return;
ereport(LOG,
errmsg("creating replication conflict detection slot"));
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
false, false);
init_conflict_slot_xmin();
}
/*
* Is current process the logical replication launcher?
*/