mirror of
https://github.com/postgres/postgres.git
synced 2025-10-22 14:32:25 +03:00
Resume conflict-relevant data retention automatically.
This commit resumes automatic retention of conflict-relevant data for a subscription. Previously, retention would stop if the apply process failed to advance its xmin (oldest_nonremovable_xid) within the configured max_retention_duration and user needs to manually re-enable retain_dead_tuples option. With this change, retention will resume automatically once the apply worker catches up and begins advancing its xmin (oldest_nonremovable_xid) within the configured threshold. Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
@@ -181,6 +181,15 @@
|
||||
* pg_subscription.subretentionactive is updated to false within a new
|
||||
* transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
|
||||
*
|
||||
* - RDT_RESUME_CONFLICT_INFO_RETENTION:
|
||||
* This phase is required only when max_retention_duration is defined. We
|
||||
* enter this phase if the retention was previously stopped, and the time
|
||||
* required to advance the non-removable transaction ID in the
|
||||
* RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
|
||||
* (or if max_retention_duration is set to 0). During this phase,
|
||||
* pg_subscription.subretentionactive is updated to true within a new
|
||||
* transaction, and the worker will be restarted.
|
||||
*
|
||||
* The overall state progression is: GET_CANDIDATE_XID ->
|
||||
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
|
||||
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
|
||||
@@ -381,7 +390,8 @@ typedef enum
|
||||
RDT_REQUEST_PUBLISHER_STATUS,
|
||||
RDT_WAIT_FOR_PUBLISHER_STATUS,
|
||||
RDT_WAIT_FOR_LOCAL_FLUSH,
|
||||
RDT_STOP_CONFLICT_INFO_RETENTION
|
||||
RDT_STOP_CONFLICT_INFO_RETENTION,
|
||||
RDT_RESUME_CONFLICT_INFO_RETENTION,
|
||||
} RetainDeadTuplesPhase;
|
||||
|
||||
/*
|
||||
@@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
|
||||
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
|
||||
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
|
||||
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
|
||||
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
|
||||
static bool update_retention_status(bool active);
|
||||
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
|
||||
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
|
||||
bool new_xid_found);
|
||||
|
||||
static void apply_worker_exit(void);
|
||||
|
||||
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
|
||||
static void apply_handle_insert_internal(ApplyExecutionData *edata,
|
||||
ResultRelInfo *relinfo,
|
||||
@@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
|
||||
if (!MySubscription->retaindeadtuples)
|
||||
return false;
|
||||
|
||||
/* No need to advance if we have already stopped retaining */
|
||||
if (!MySubscription->retentionactive)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
|
||||
case RDT_STOP_CONFLICT_INFO_RETENTION:
|
||||
stop_conflict_info_retention(rdt_data);
|
||||
break;
|
||||
case RDT_RESUME_CONFLICT_INFO_RETENTION:
|
||||
resume_conflict_info_retention(rdt_data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
|
||||
* retaining conflict information for this worker.
|
||||
*/
|
||||
if (should_stop_conflict_info_retention(rdt_data))
|
||||
{
|
||||
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
|
||||
rdt_data->remote_wait_for = rdt_data->remote_nextxid;
|
||||
@@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
|
||||
* retaining conflict information for this worker.
|
||||
*/
|
||||
if (should_stop_conflict_info_retention(rdt_data))
|
||||
{
|
||||
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update and check the remote flush position if we are applying changes
|
||||
@@ -4672,6 +4691,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
|
||||
if (last_flushpos < rdt_data->remote_lsn)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Reaching this point implies should_stop_conflict_info_retention()
|
||||
* returned false earlier, meaning that the most recent duration for
|
||||
* advancing the non-removable transaction ID is within the
|
||||
* max_retention_duration or max_retention_duration is set to 0.
|
||||
*
|
||||
* Therefore, if conflict info retention was previously stopped due to a
|
||||
* timeout, it is now safe to resume retention.
|
||||
*/
|
||||
if (!MySubscription->retentionactive)
|
||||
{
|
||||
rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reaching here means the remote WAL position has been received, and all
|
||||
* transactions up to that position on the publisher have been applied and
|
||||
@@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
|
||||
* Check whether conflict information retention should be stopped due to
|
||||
* exceeding the maximum wait time (max_retention_duration).
|
||||
*
|
||||
* If retention should be stopped, transition to the
|
||||
* RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
|
||||
* false.
|
||||
*
|
||||
* Note: Retention won't be resumed automatically. The user must manually
|
||||
* disable retain_dead_tuples and re-enable it after confirming that the
|
||||
* replication slot maintained by the launcher has been dropped.
|
||||
* If retention should be stopped, return true. Otherwise, return false.
|
||||
*/
|
||||
static bool
|
||||
should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
@@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
rdt_data->table_sync_wait_time))
|
||||
return false;
|
||||
|
||||
rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
|
||||
|
||||
/* process the next phase */
|
||||
process_rdt_phase_transition(rdt_data, false);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -4748,6 +4771,86 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
*/
|
||||
static void
|
||||
stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
{
|
||||
/* Stop retention if not yet */
|
||||
if (MySubscription->retentionactive)
|
||||
{
|
||||
/*
|
||||
* If the retention status cannot be updated (e.g., due to active
|
||||
* transaction), skip further processing to avoid inconsistent
|
||||
* retention behavior.
|
||||
*/
|
||||
if (!update_retention_status(false))
|
||||
return;
|
||||
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
ereport(LOG,
|
||||
errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
|
||||
MySubscription->name),
|
||||
errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.",
|
||||
MySubscription->maxretention));
|
||||
}
|
||||
|
||||
Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
|
||||
|
||||
/*
|
||||
* If retention has been stopped, reset to the initial phase to retry
|
||||
* resuming retention. This reset is required to recalculate the current
|
||||
* wait time and resume retention if the time falls within
|
||||
* max_retention_duration.
|
||||
*/
|
||||
reset_retention_data_fields(rdt_data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
|
||||
*/
|
||||
static void
|
||||
resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
{
|
||||
/* We can't resume retention without updating retention status. */
|
||||
if (!update_retention_status(true))
|
||||
return;
|
||||
|
||||
ereport(LOG,
|
||||
errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
|
||||
MySubscription->name),
|
||||
MySubscription->maxretention
|
||||
? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.",
|
||||
MySubscription->maxretention)
|
||||
: errdetail("Retention is re-enabled as max_retention_duration is set to unlimited."));
|
||||
|
||||
/*
|
||||
* Restart the worker to let the launcher initialize
|
||||
* oldest_nonremovable_xid at startup.
|
||||
*
|
||||
* While it's technically possible to derive this value on-the-fly using
|
||||
* the conflict detection slot's xmin, doing so risks a race condition:
|
||||
* the launcher might clean slot.xmin just after retention resumes. This
|
||||
* would make oldest_nonremovable_xid unreliable, especially during xid
|
||||
* wraparound.
|
||||
*
|
||||
* Although this can be prevented by introducing heavy weight locking, the
|
||||
* complexity it will bring doesn't seem worthwhile given how rarely
|
||||
* retention is resumed.
|
||||
*/
|
||||
apply_worker_exit();
|
||||
}
|
||||
|
||||
/*
|
||||
* Updates pg_subscription.subretentionactive to the given value within a
|
||||
* new transaction.
|
||||
*
|
||||
* If already inside an active transaction, skips the update and returns
|
||||
* false.
|
||||
*
|
||||
* Returns true if the update is successfully performed.
|
||||
*/
|
||||
static bool
|
||||
update_retention_status(bool active)
|
||||
{
|
||||
/*
|
||||
* Do not update the catalog during an active transaction. The transaction
|
||||
@@ -4755,7 +4858,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
* rollback of catalog updates if the application fails subsequently.
|
||||
*/
|
||||
if (IsTransactionState())
|
||||
return;
|
||||
return false;
|
||||
|
||||
StartTransactionCommand();
|
||||
|
||||
@@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
|
||||
*/
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
/* Set pg_subscription.subretentionactive to false */
|
||||
UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
|
||||
/* Update pg_subscription.subretentionactive */
|
||||
UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
|
||||
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
ereport(LOG,
|
||||
errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
|
||||
MySubscription->name),
|
||||
errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
|
||||
MySubscription->maxretention));
|
||||
|
||||
/* Notify launcher to update the conflict slot */
|
||||
ApplyLauncherWakeup();
|
||||
|
||||
reset_retention_data_fields(rdt_data);
|
||||
MySubscription->retentionactive = active;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
|
||||
/*
|
||||
* Adjust the interval for advancing non-removable transaction IDs.
|
||||
*
|
||||
* If there is no activity on the node, we progressively double the interval
|
||||
* used to advance non-removable transaction ID. This helps conserve CPU
|
||||
* and network resources when there's little benefit to frequent updates.
|
||||
* If there is no activity on the node or retention has been stopped, we
|
||||
* progressively double the interval used to advance non-removable transaction
|
||||
* ID. This helps conserve CPU and network resources when there's little benefit
|
||||
* to frequent updates.
|
||||
*
|
||||
* The interval is capped by the lowest of the following:
|
||||
* - wal_receiver_status_interval (if set),
|
||||
* - wal_receiver_status_interval (if set and retention is active),
|
||||
* - a default maximum of 3 minutes,
|
||||
* - max_retention_duration.
|
||||
* - max_retention_duration (if retention is active).
|
||||
*
|
||||
* This ensures the interval never exceeds the retention boundary, even if
|
||||
* other limits are higher. Once activity resumes on the node, the interval
|
||||
* is reset to lesser of 100ms and max_retention_duration, allowing timely
|
||||
* advancement of non-removable transaction ID.
|
||||
* This ensures the interval never exceeds the retention boundary, even if other
|
||||
* limits are higher. Once activity resumes on the node and the retention is
|
||||
* active, the interval is reset to lesser of 100ms and max_retention_duration,
|
||||
* allowing timely advancement of non-removable transaction ID.
|
||||
*
|
||||
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
|
||||
* consider the other interval or a separate GUC if the need arises.
|
||||
@@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
|
||||
static void
|
||||
adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
|
||||
{
|
||||
if (!new_xid_found && rdt_data->xid_advance_interval)
|
||||
if (rdt_data->xid_advance_interval && !new_xid_found)
|
||||
{
|
||||
int max_interval = wal_receiver_status_interval
|
||||
? wal_receiver_status_interval * 1000
|
||||
@@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
|
||||
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
|
||||
max_interval);
|
||||
}
|
||||
else if (rdt_data->xid_advance_interval &&
|
||||
!MySubscription->retentionactive)
|
||||
{
|
||||
/*
|
||||
* Retention has been stopped, so double the interval-capped at a
|
||||
* maximum of 3 minutes. The wal_receiver_status_interval is
|
||||
* intentionally not used as a upper bound, since the likelihood of
|
||||
* retention resuming is lower than that of general activity resuming.
|
||||
*/
|
||||
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
|
||||
MAX_XID_ADVANCE_INTERVAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
@@ -4851,9 +4959,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
|
||||
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
|
||||
}
|
||||
|
||||
/* Ensure the wait time remains within the maximum limit */
|
||||
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
|
||||
MySubscription->maxretention);
|
||||
/*
|
||||
* Ensure the wait time remains within the maximum retention time limit
|
||||
* when retention is active.
|
||||
*/
|
||||
if (MySubscription->retentionactive)
|
||||
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
|
||||
MySubscription->maxretention);
|
||||
}
|
||||
|
||||
/*
|
||||
|
Reference in New Issue
Block a user