mirror of
https://github.com/postgres/postgres.git
synced 2025-10-19 15:49:24 +03:00
Detect and report update_deleted conflicts.
This enhancement builds upon the infrastructure introduced in commit
228c370868
, which enables the preservation of deleted tuples and their
origin information on the subscriber. This capability is crucial for
handling concurrent transactions replicated from remote nodes.
The update introduces support for detecting update_deleted conflicts
during the application of update operations on the subscriber. When an
update operation fails to locate the target row-typically because it has
been concurrently deleted-we perform an additional table scan. This scan
uses the SnapshotAny mechanism and we do this additional scan only when
the retain_dead_tuples option is enabled for the relevant subscription.
The goal of this scan is to locate the most recently deleted tuple-matching
the old column values from the remote update-that has not yet been removed
by VACUUM and is still visible according to our slot (i.e., its deletion
is not older than conflict-detection-slot's xmin). If such a tuple is
found, the system reports an update_deleted conflict, including the origin
and transaction details responsible for the deletion.
This provides a groundwork for more robust and accurate conflict
resolution process, preventing unexpected behavior by correctly
identifying cases where a remote update clashes with a deletion from
another origin.
Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
@@ -138,9 +138,9 @@
|
||||
* Each apply worker that enabled retain_dead_tuples option maintains a
|
||||
* non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
|
||||
* prevent dead rows from being removed prematurely when the apply worker still
|
||||
* needs them to detect conflicts reliably. This helps to retain the required
|
||||
* commit_ts module information, which further helps to detect
|
||||
* update_origin_differs and delete_origin_differs conflicts reliably, as
|
||||
* needs them to detect update_deleted conflicts. Additionally, this helps to
|
||||
* retain the required commit_ts module information, which further helps to
|
||||
* detect update_origin_differs and delete_origin_differs conflicts reliably, as
|
||||
* otherwise, vacuum freeze could remove the required information.
|
||||
*
|
||||
* The logical replication launcher manages an internal replication slot named
|
||||
@@ -185,10 +185,10 @@
|
||||
* transactions that occurred concurrently with the tuple DELETE, any
|
||||
* subsequent UPDATE from a remote node should have a later timestamp. In such
|
||||
* cases, it is acceptable to detect an update_missing scenario and convert the
|
||||
* UPDATE to an INSERT when applying it. But, detecting concurrent remote
|
||||
* transactions with earlier timestamps than the DELETE is necessary, as the
|
||||
* UPDATEs in remote transactions should be ignored if their timestamp is
|
||||
* earlier than that of the dead tuples.
|
||||
* UPDATE to an INSERT when applying it. But, for concurrent remote
|
||||
* transactions with earlier timestamps than the DELETE, detecting
|
||||
* update_deleted is necessary, as the UPDATEs in remote transactions should be
|
||||
* ignored if their timestamp is earlier than that of the dead tuples.
|
||||
*
|
||||
* Note that advancing the non-removable transaction ID is not supported if the
|
||||
* publisher is also a physical standby. This is because the logical walsender
|
||||
@@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
|
||||
Oid localidxoid,
|
||||
TupleTableSlot *remoteslot,
|
||||
TupleTableSlot **localslot);
|
||||
static bool FindDeletedTupleInLocalRel(Relation localrel,
|
||||
Oid localidxoid,
|
||||
TupleTableSlot *remoteslot,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
TimestampTz *delete_time);
|
||||
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
TupleTableSlot *remoteslot,
|
||||
LogicalRepTupleData *newtup,
|
||||
@@ -2912,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
||||
}
|
||||
else
|
||||
{
|
||||
ConflictType type;
|
||||
TupleTableSlot *newslot = localslot;
|
||||
|
||||
/*
|
||||
* Detecting whether the tuple was recently deleted or never existed
|
||||
* is crucial to avoid misleading the user during confict handling.
|
||||
*/
|
||||
if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
|
||||
&conflicttuple.xmin,
|
||||
&conflicttuple.origin,
|
||||
&conflicttuple.ts) &&
|
||||
conflicttuple.origin != replorigin_session_origin)
|
||||
type = CT_UPDATE_DELETED;
|
||||
else
|
||||
type = CT_UPDATE_MISSING;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
slot_store_data(newslot, relmapentry, newtup);
|
||||
|
||||
/*
|
||||
* The tuple to be updated could not be found. Do nothing except for
|
||||
* emitting a log message.
|
||||
* The tuple to be updated could not be found or was deleted. Do
|
||||
* nothing except for emitting a log message.
|
||||
*/
|
||||
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
|
||||
remoteslot, newslot, list_make1(&conflicttuple));
|
||||
ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
|
||||
list_make1(&conflicttuple));
|
||||
}
|
||||
|
||||
/* Cleanup. */
|
||||
@@ -3142,6 +3162,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
|
||||
return found;
|
||||
}
|
||||
|
||||
/*
|
||||
* Determine whether the index can reliably locate the deleted tuple in the
|
||||
* local relation.
|
||||
*
|
||||
* An index may exclude deleted tuples if it was re-indexed or re-created during
|
||||
* change application. Therefore, an index is considered usable only if the
|
||||
* conflict detection slot.xmin (conflict_detection_xmin) is greater than the
|
||||
* index tuple's xmin. This ensures that any tuples deleted prior to the index
|
||||
* creation or re-indexing are not relevant for conflict detection in the
|
||||
* current apply worker.
|
||||
*
|
||||
* Note that indexes may also be excluded if they were modified by other DDL
|
||||
* operations, such as ALTER INDEX. However, this is acceptable, as the
|
||||
* likelihood of such DDL changes coinciding with the need to scan dead
|
||||
* tuples for the update_deleted is low.
|
||||
*/
|
||||
static bool
|
||||
IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
|
||||
TransactionId conflict_detection_xmin)
|
||||
{
|
||||
HeapTuple index_tuple;
|
||||
TransactionId index_xmin;
|
||||
|
||||
index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
|
||||
|
||||
if (!HeapTupleIsValid(index_tuple)) /* should not happen */
|
||||
elog(ERROR, "cache lookup failed for index %u", localindexoid);
|
||||
|
||||
/*
|
||||
* No need to check for a frozen transaction ID, as
|
||||
* TransactionIdPrecedes() manages it internally, treating it as falling
|
||||
* behind the conflict_detection_xmin.
|
||||
*/
|
||||
index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
|
||||
|
||||
ReleaseSysCache(index_tuple);
|
||||
|
||||
return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempts to locate a deleted tuple in the local relation that matches the
|
||||
* values of the tuple received from the publication side (in 'remoteslot').
|
||||
* The search is performed using either the replica identity index, primary
|
||||
* key, other available index, or a sequential scan if necessary.
|
||||
*
|
||||
* Returns true if the deleted tuple is found. If found, the transaction ID,
|
||||
* origin, and commit timestamp of the deletion are stored in '*delete_xid',
|
||||
* '*delete_origin', and '*delete_time' respectively.
|
||||
*/
|
||||
static bool
|
||||
FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
|
||||
TupleTableSlot *remoteslot,
|
||||
TransactionId *delete_xid, RepOriginId *delete_origin,
|
||||
TimestampTz *delete_time)
|
||||
{
|
||||
TransactionId oldestxmin;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
/*
|
||||
* Return false if either dead tuples are not retained or commit timestamp
|
||||
* data is not available.
|
||||
*/
|
||||
if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
|
||||
return false;
|
||||
|
||||
/*
|
||||
* For conflict detection, we use the conflict slot's xmin value instead
|
||||
* of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
|
||||
* a threshold to identify tuples that were recently deleted. These tuples
|
||||
* are not visible to concurrent transactions, but we log an
|
||||
* update_deleted conflict if such a tuple matches the remote update being
|
||||
* applied.
|
||||
*
|
||||
* Although GetOldestNonRemovableTransactionId() can return a value older
|
||||
* than the slot's xmin, for our current purpose it is acceptable to treat
|
||||
* tuples deleted by transactions prior to slot.xmin as update_missing
|
||||
* conflicts.
|
||||
*
|
||||
* Ideally, we would use oldest_nonremovable_xid, which is directly
|
||||
* maintained by the leader apply worker. However, this value is not
|
||||
* available to table synchronization or parallel apply workers, making
|
||||
* slot.xmin a practical alternative in those contexts.
|
||||
*/
|
||||
slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
|
||||
|
||||
Assert(slot);
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
oldestxmin = slot->data.xmin;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
Assert(TransactionIdIsValid(oldestxmin));
|
||||
|
||||
if (OidIsValid(localidxoid) &&
|
||||
IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
|
||||
return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
|
||||
remoteslot, oldestxmin,
|
||||
delete_xid, delete_origin,
|
||||
delete_time);
|
||||
else
|
||||
return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
|
||||
oldestxmin, delete_xid,
|
||||
delete_origin, delete_time);
|
||||
}
|
||||
|
||||
/*
|
||||
* This handles insert, update, delete on a partitioned table.
|
||||
*/
|
||||
@@ -3260,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
remoteslot_part, &localslot);
|
||||
if (!found)
|
||||
{
|
||||
ConflictType type;
|
||||
TupleTableSlot *newslot = localslot;
|
||||
|
||||
/*
|
||||
* Detecting whether the tuple was recently deleted or
|
||||
* never existed is crucial to avoid misleading the user
|
||||
* during confict handling.
|
||||
*/
|
||||
if (FindDeletedTupleInLocalRel(partrel,
|
||||
part_entry->localindexoid,
|
||||
remoteslot_part,
|
||||
&conflicttuple.xmin,
|
||||
&conflicttuple.origin,
|
||||
&conflicttuple.ts) &&
|
||||
conflicttuple.origin != replorigin_session_origin)
|
||||
type = CT_UPDATE_DELETED;
|
||||
else
|
||||
type = CT_UPDATE_MISSING;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
slot_store_data(newslot, part_entry, newtup);
|
||||
|
||||
/*
|
||||
* The tuple to be updated could not be found. Do nothing
|
||||
* except for emitting a log message.
|
||||
* The tuple to be updated could not be found or was
|
||||
* deleted. Do nothing except for emitting a log message.
|
||||
*/
|
||||
ReportApplyConflict(estate, partrelinfo, LOG,
|
||||
CT_UPDATE_MISSING, remoteslot_part,
|
||||
newslot, list_make1(&conflicttuple));
|
||||
type, remoteslot_part, newslot,
|
||||
list_make1(&conflicttuple));
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -4172,8 +4315,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
|
||||
{
|
||||
/*
|
||||
* It is sufficient to manage non-removable transaction ID for a
|
||||
* subscription by the main apply worker to detect conflicts reliably even
|
||||
* for table sync or parallel apply workers.
|
||||
* subscription by the main apply worker to detect update_deleted reliably
|
||||
* even for table sync or parallel apply workers.
|
||||
*/
|
||||
if (!am_leader_apply_worker())
|
||||
return false;
|
||||
@@ -4374,10 +4517,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
|
||||
* We expect the publisher and subscriber clocks to be in sync using time
|
||||
* sync service like NTP. Otherwise, we will advance this worker's
|
||||
* oldest_nonremovable_xid prematurely, leading to the removal of rows
|
||||
* required to detect conflicts reliably. This check primarily addresses
|
||||
* scenarios where the publisher's clock falls behind; if the publisher's
|
||||
* clock is ahead, subsequent transactions will naturally bear later
|
||||
* commit timestamps, conforming to the design outlined atop worker.c.
|
||||
* required to detect update_deleted reliably. This check primarily
|
||||
* addresses scenarios where the publisher's clock falls behind; if the
|
||||
* publisher's clock is ahead, subsequent transactions will naturally bear
|
||||
* later commit timestamps, conforming to the design outlined atop
|
||||
* worker.c.
|
||||
*
|
||||
* XXX Consider waiting for the publisher's clock to catch up with the
|
||||
* subscriber's before proceeding to the next phase.
|
||||
|
Reference in New Issue
Block a user