1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-16 17:07:43 +03:00

Resume conflict-relevant data retention automatically.

This commit resumes automatic retention of conflict-relevant data for a
subscription. Previously, retention would stop if the apply process failed
to advance its xmin (oldest_nonremovable_xid) within the configured
max_retention_duration and user needs to manually re-enable
retain_dead_tuples option. With this change, retention will resume
automatically once the apply worker catches up and begins advancing its
xmin (oldest_nonremovable_xid) within the configured threshold.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila
2025-09-15 08:44:54 +00:00
parent 282d0bdee6
commit 0d48d393d4
4 changed files with 196 additions and 50 deletions

View File

@@ -538,10 +538,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<literal>retain_dead_tuples</literal> is enabled, confirm that the <literal>retain_dead_tuples</literal> is enabled, confirm that the
retention duration has exceeded the retention duration has exceeded the
<literal>max_retention_duration</literal> set within the corresponding <literal>max_retention_duration</literal> set within the corresponding
subscription. The retention will not be automatically resumed unless a subscription. The retention will automatically resume when at least one
new subscription is created with <literal>retain_dead_tuples = apply worker confirms that the retention duration is within the
true</literal>, or the user manually re-enables specified limit, or when a new subscription is created with
<literal>retain_dead_tuples</literal>. <literal>retain_dead_tuples = true</literal>. Alternatively, retention
can be manually resumed by re-enabling <literal>retain_dead_tuples</literal>.
</para> </para>
<para> <para>
Note that overall retention will not stop if other subscriptions that Note that overall retention will not stop if other subscriptions that

View File

@@ -1261,24 +1261,30 @@ ApplyLauncherMain(Datum main_arg)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false); w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL) if (w != NULL)
{ {
/* /*
* Compute the minimum xmin required to protect dead tuples * Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply * required for conflict detection among all running apply
* workers. * workers. This computation is performed while holding
* LogicalRepWorkerLock to prevent accessing invalid worker
* data, in scenarios where a worker might exit and reset its
* state concurrently.
*/ */
if (sub->retaindeadtuples && if (sub->retaindeadtuples &&
sub->retentionactive && sub->retentionactive &&
can_update_xmin) can_update_xmin)
compute_min_nonremovable_xid(w, &xmin); compute_min_nonremovable_xid(w, &xmin);
LWLockRelease(LogicalRepWorkerLock);
/* worker is running already */ /* worker is running already */
continue; continue;
} }
LWLockRelease(LogicalRepWorkerLock);
/* /*
* Can't advance xmin of the slot unless all the workers * Can't advance xmin of the slot unless all the workers
* corresponding to subscriptions actively retaining dead tuples * corresponding to subscriptions actively retaining dead tuples

View File

@@ -181,6 +181,15 @@
* pg_subscription.subretentionactive is updated to false within a new * pg_subscription.subretentionactive is updated to false within a new
* transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
* *
* - RDT_RESUME_CONFLICT_INFO_RETENTION:
* This phase is required only when max_retention_duration is defined. We
* enter this phase if the retention was previously stopped, and the time
* required to advance the non-removable transaction ID in the
* RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
* (or if max_retention_duration is set to 0). During this phase,
* pg_subscription.subretentionactive is updated to true within a new
* transaction, and the worker will be restarted.
*
* The overall state progression is: GET_CANDIDATE_XID -> * The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -381,7 +390,8 @@ typedef enum
RDT_REQUEST_PUBLISHER_STATUS, RDT_REQUEST_PUBLISHER_STATUS,
RDT_WAIT_FOR_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS,
RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_LOCAL_FLUSH,
RDT_STOP_CONFLICT_INFO_RETENTION RDT_STOP_CONFLICT_INFO_RETENTION,
RDT_RESUME_CONFLICT_INFO_RETENTION,
} RetainDeadTuplesPhase; } RetainDeadTuplesPhase;
/* /*
@@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
static bool update_retention_status(bool active);
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found); bool new_xid_found);
static void apply_worker_exit(void);
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata, static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo, ResultRelInfo *relinfo,
@@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
if (!MySubscription->retaindeadtuples) if (!MySubscription->retaindeadtuples)
return false; return false;
/* No need to advance if we have already stopped retaining */
if (!MySubscription->retentionactive)
return false;
return true; return true;
} }
@@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
case RDT_STOP_CONFLICT_INFO_RETENTION: case RDT_STOP_CONFLICT_INFO_RETENTION:
stop_conflict_info_retention(rdt_data); stop_conflict_info_retention(rdt_data);
break; break;
case RDT_RESUME_CONFLICT_INFO_RETENTION:
resume_conflict_info_retention(rdt_data);
break;
} }
} }
@@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
* retaining conflict information for this worker. * retaining conflict information for this worker.
*/ */
if (should_stop_conflict_info_retention(rdt_data)) if (should_stop_conflict_info_retention(rdt_data))
{
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
return; return;
}
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
rdt_data->remote_wait_for = rdt_data->remote_nextxid; rdt_data->remote_wait_for = rdt_data->remote_nextxid;
@@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* retaining conflict information for this worker. * retaining conflict information for this worker.
*/ */
if (should_stop_conflict_info_retention(rdt_data)) if (should_stop_conflict_info_retention(rdt_data))
{
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
return; return;
}
/* /*
* Update and check the remote flush position if we are applying changes * Update and check the remote flush position if we are applying changes
@@ -4672,6 +4691,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
if (last_flushpos < rdt_data->remote_lsn) if (last_flushpos < rdt_data->remote_lsn)
return; return;
/*
* Reaching this point implies should_stop_conflict_info_retention()
* returned false earlier, meaning that the most recent duration for
* advancing the non-removable transaction ID is within the
* max_retention_duration or max_retention_duration is set to 0.
*
* Therefore, if conflict info retention was previously stopped due to a
* timeout, it is now safe to resume retention.
*/
if (!MySubscription->retentionactive)
{
rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
return;
}
/* /*
* Reaching here means the remote WAL position has been received, and all * Reaching here means the remote WAL position has been received, and all
* transactions up to that position on the publisher have been applied and * transactions up to that position on the publisher have been applied and
@@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* Check whether conflict information retention should be stopped due to * Check whether conflict information retention should be stopped due to
* exceeding the maximum wait time (max_retention_duration). * exceeding the maximum wait time (max_retention_duration).
* *
* If retention should be stopped, transition to the * If retention should be stopped, return true. Otherwise, return false.
* RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
* false.
*
* Note: Retention won't be resumed automatically. The user must manually
* disable retain_dead_tuples and re-enable it after confirming that the
* replication slot maintained by the launcher has been dropped.
*/ */
static bool static bool
should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
rdt_data->table_sync_wait_time)) rdt_data->table_sync_wait_time))
return false; return false;
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
/* process the next phase */
process_rdt_phase_transition(rdt_data, false);
return true; return true;
} }
@@ -4748,6 +4771,86 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
*/ */
static void static void
stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
{
/* Stop retention if not yet */
if (MySubscription->retentionactive)
{
/*
* If the retention status cannot be updated (e.g., due to active
* transaction), skip further processing to avoid inconsistent
* retention behavior.
*/
if (!update_retention_status(false))
return;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
MySubscription->name),
errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.",
MySubscription->maxretention));
}
Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
/*
* If retention has been stopped, reset to the initial phase to retry
* resuming retention. This reset is required to recalculate the current
* wait time and resume retention if the time falls within
* max_retention_duration.
*/
reset_retention_data_fields(rdt_data);
}
/*
* Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
*/
static void
resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
{
/* We can't resume retention without updating retention status. */
if (!update_retention_status(true))
return;
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
MySubscription->name),
MySubscription->maxretention
? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.",
MySubscription->maxretention)
: errdetail("Retention is re-enabled as max_retention_duration is set to unlimited."));
/*
* Restart the worker to let the launcher initialize
* oldest_nonremovable_xid at startup.
*
* While it's technically possible to derive this value on-the-fly using
* the conflict detection slot's xmin, doing so risks a race condition:
* the launcher might clean slot.xmin just after retention resumes. This
* would make oldest_nonremovable_xid unreliable, especially during xid
* wraparound.
*
* Although this can be prevented by introducing heavy weight locking, the
* complexity it will bring doesn't seem worthwhile given how rarely
* retention is resumed.
*/
apply_worker_exit();
}
/*
* Updates pg_subscription.subretentionactive to the given value within a
* new transaction.
*
* If already inside an active transaction, skips the update and returns
* false.
*
* Returns true if the update is successfully performed.
*/
static bool
update_retention_status(bool active)
{ {
/* /*
* Do not update the catalog during an active transaction. The transaction * Do not update the catalog during an active transaction. The transaction
@@ -4755,7 +4858,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
* rollback of catalog updates if the application fails subsequently. * rollback of catalog updates if the application fails subsequently.
*/ */
if (IsTransactionState()) if (IsTransactionState())
return; return false;
StartTransactionCommand(); StartTransactionCommand();
@@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
*/ */
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* Set pg_subscription.subretentionactive to false */ /* Update pg_subscription.subretentionactive */
UpdateDeadTupleRetentionStatus(MySubscription->oid, false); UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
MySubscription->name),
errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
MySubscription->maxretention));
/* Notify launcher to update the conflict slot */ /* Notify launcher to update the conflict slot */
ApplyLauncherWakeup(); ApplyLauncherWakeup();
reset_retention_data_fields(rdt_data); MySubscription->retentionactive = active;
return true;
} }
/* /*
@@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
/* /*
* Adjust the interval for advancing non-removable transaction IDs. * Adjust the interval for advancing non-removable transaction IDs.
* *
* If there is no activity on the node, we progressively double the interval * If there is no activity on the node or retention has been stopped, we
* used to advance non-removable transaction ID. This helps conserve CPU * progressively double the interval used to advance non-removable transaction
* and network resources when there's little benefit to frequent updates. * ID. This helps conserve CPU and network resources when there's little benefit
* to frequent updates.
* *
* The interval is capped by the lowest of the following: * The interval is capped by the lowest of the following:
* - wal_receiver_status_interval (if set), * - wal_receiver_status_interval (if set and retention is active),
* - a default maximum of 3 minutes, * - a default maximum of 3 minutes,
* - max_retention_duration. * - max_retention_duration (if retention is active).
* *
* This ensures the interval never exceeds the retention boundary, even if * This ensures the interval never exceeds the retention boundary, even if other
* other limits are higher. Once activity resumes on the node, the interval * limits are higher. Once activity resumes on the node and the retention is
* is reset to lesser of 100ms and max_retention_duration, allowing timely * active, the interval is reset to lesser of 100ms and max_retention_duration,
* advancement of non-removable transaction ID. * allowing timely advancement of non-removable transaction ID.
* *
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises. * consider the other interval or a separate GUC if the need arises.
@@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
static void static void
adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
{ {
if (!new_xid_found && rdt_data->xid_advance_interval) if (rdt_data->xid_advance_interval && !new_xid_found)
{ {
int max_interval = wal_receiver_status_interval int max_interval = wal_receiver_status_interval
? wal_receiver_status_interval * 1000 ? wal_receiver_status_interval * 1000
@@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
max_interval); max_interval);
} }
else if (rdt_data->xid_advance_interval &&
!MySubscription->retentionactive)
{
/*
* Retention has been stopped, so double the interval-capped at a
* maximum of 3 minutes. The wal_receiver_status_interval is
* intentionally not used as a upper bound, since the likelihood of
* retention resuming is lower than that of general activity resuming.
*/
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
MAX_XID_ADVANCE_INTERVAL);
}
else else
{ {
/* /*
@@ -4851,9 +4959,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
} }
/* Ensure the wait time remains within the maximum limit */ /*
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, * Ensure the wait time remains within the maximum retention time limit
MySubscription->maxretention); * when retention is active.
*/
if (MySubscription->retentionactive)
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
MySubscription->maxretention);
} }
/* /*

View File

@@ -631,6 +631,33 @@ $node_B->safe_psql('postgres',
$node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
$node_B->reload; $node_B->reload;
###############################################################################
# Check that dead tuple retention resumes when the max_retention_duration is set
# 0.
###############################################################################
$log_offset = -s $node_A->logfile;
# Set max_retention_duration to 0
$node_A->safe_psql('postgres',
"ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);");
# Confirm that the retention resumes
$node_A->wait_for_log(
qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts
.*DETAIL:.* Retention is re-enabled as max_retention_duration is set to unlimited.*/,
$log_offset);
ok( $node_A->poll_query_until(
'postgres',
"SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
),
"the xmin value of slot 'pg_conflict_detection' is valid on Node A");
$result = $node_A->safe_psql('postgres',
"SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
is($result, qq(t), 'retention is active');
############################################################################### ###############################################################################
# Check that the replication slot pg_conflict_detection is dropped after # Check that the replication slot pg_conflict_detection is dropped after
# removing all the subscriptions. # removing all the subscriptions.