mirror of
https://github.com/postgres/postgres.git
synced 2025-07-12 21:01:52 +03:00
Log the conflicts while applying changes in logical replication.
This patch provides the additional logging information in the following conflict scenarios while applying changes: insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint. update_differ: Updating a row that was previously modified by another origin. update_exists: The updated row value violates a NOT DEFERRABLE unique constraint. update_missing: The tuple to be updated is missing. delete_differ: Deleting a row that was previously modified by another origin. delete_missing: The tuple to be deleted is missing. For insert_exists and update_exists conflicts, the log can include the origin and commit timestamp details of the conflicting key with track_commit_timestamp enabled. update_differ and delete_differ conflicts can only be detected when track_commit_timestamp is enabled on the subscriber. We do not offer additional logging for exclusion constraint violations because these constraints can specify rules that are more complex than simple equality checks. Resolving such conflicts won't be straightforward. This area can be further enhanced if required. Author: Hou Zhijie Reviewed-by: Shveta Malik, Amit Kapila, Nisha Moond, Hayato Kuroda, Dilip Kumar Discussion: https://postgr.es/m/OS0PR01MB5716352552DFADB8E9AD1D8994C92@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
|
||||
|
||||
OBJS = \
|
||||
applyparallelworker.o \
|
||||
conflict.o \
|
||||
decode.o \
|
||||
launcher.o \
|
||||
logical.o \
|
||||
|
488
src/backend/replication/logical/conflict.c
Normal file
488
src/backend/replication/logical/conflict.c
Normal file
@ -0,0 +1,488 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* conflict.c
|
||||
* Support routines for logging conflicts.
|
||||
*
|
||||
* Copyright (c) 2024, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logical/conflict.c
|
||||
*
|
||||
* This file contains the code for logging conflicts on the subscriber during
|
||||
* logical replication.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/commit_ts.h"
|
||||
#include "access/tableam.h"
|
||||
#include "executor/executor.h"
|
||||
#include "replication/conflict.h"
|
||||
#include "replication/logicalrelation.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
static const char *const ConflictTypeNames[] = {
|
||||
[CT_INSERT_EXISTS] = "insert_exists",
|
||||
[CT_UPDATE_DIFFER] = "update_differ",
|
||||
[CT_UPDATE_EXISTS] = "update_exists",
|
||||
[CT_UPDATE_MISSING] = "update_missing",
|
||||
[CT_DELETE_DIFFER] = "delete_differ",
|
||||
[CT_DELETE_MISSING] = "delete_missing"
|
||||
};
|
||||
|
||||
static int errcode_apply_conflict(ConflictType type);
|
||||
static int errdetail_apply_conflict(EState *estate,
|
||||
ResultRelInfo *relinfo,
|
||||
ConflictType type,
|
||||
TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot,
|
||||
TupleTableSlot *remoteslot,
|
||||
Oid indexoid, TransactionId localxmin,
|
||||
RepOriginId localorigin,
|
||||
TimestampTz localts);
|
||||
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
|
||||
ConflictType type,
|
||||
TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot,
|
||||
TupleTableSlot *remoteslot,
|
||||
Oid indexoid);
|
||||
static char *build_index_value_desc(EState *estate, Relation localrel,
|
||||
TupleTableSlot *slot, Oid indexoid);
|
||||
|
||||
/*
|
||||
* Get the xmin and commit timestamp data (origin and timestamp) associated
|
||||
* with the provided local tuple.
|
||||
*
|
||||
* Return true if the commit timestamp data was found, false otherwise.
|
||||
*/
|
||||
bool
|
||||
GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
|
||||
RepOriginId *localorigin, TimestampTz *localts)
|
||||
{
|
||||
Datum xminDatum;
|
||||
bool isnull;
|
||||
|
||||
xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
|
||||
&isnull);
|
||||
*xmin = DatumGetTransactionId(xminDatum);
|
||||
Assert(!isnull);
|
||||
|
||||
/*
|
||||
* The commit timestamp data is not available if track_commit_timestamp is
|
||||
* disabled.
|
||||
*/
|
||||
if (!track_commit_timestamp)
|
||||
{
|
||||
*localorigin = InvalidRepOriginId;
|
||||
*localts = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is used to report a conflict while applying replication
|
||||
* changes.
|
||||
*
|
||||
* 'searchslot' should contain the tuple used to search the local tuple to be
|
||||
* updated or deleted.
|
||||
*
|
||||
* 'localslot' should contain the existing local tuple, if any, that conflicts
|
||||
* with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
|
||||
* transaction information related to this existing local tuple.
|
||||
*
|
||||
* 'remoteslot' should contain the remote new tuple, if any.
|
||||
*
|
||||
* The 'indexoid' represents the OID of the unique index that triggered the
|
||||
* constraint violation error. We use this to report the key values for
|
||||
* conflicting tuple.
|
||||
*
|
||||
* The caller must ensure that the index with the OID 'indexoid' is locked so
|
||||
* that we can fetch and display the conflicting key value.
|
||||
*/
|
||||
void
|
||||
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
|
||||
ConflictType type, TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
||||
Oid indexoid, TransactionId localxmin,
|
||||
RepOriginId localorigin, TimestampTz localts)
|
||||
{
|
||||
Relation localrel = relinfo->ri_RelationDesc;
|
||||
|
||||
Assert(!OidIsValid(indexoid) ||
|
||||
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
|
||||
|
||||
ereport(elevel,
|
||||
errcode_apply_conflict(type),
|
||||
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
|
||||
get_namespace_name(RelationGetNamespace(localrel)),
|
||||
RelationGetRelationName(localrel),
|
||||
ConflictTypeNames[type]),
|
||||
errdetail_apply_conflict(estate, relinfo, type, searchslot,
|
||||
localslot, remoteslot, indexoid,
|
||||
localxmin, localorigin, localts));
|
||||
}
|
||||
|
||||
/*
|
||||
* Find all unique indexes to check for a conflict and store them into
|
||||
* ResultRelInfo.
|
||||
*/
|
||||
void
|
||||
InitConflictIndexes(ResultRelInfo *relInfo)
|
||||
{
|
||||
List *uniqueIndexes = NIL;
|
||||
|
||||
for (int i = 0; i < relInfo->ri_NumIndices; i++)
|
||||
{
|
||||
Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
|
||||
|
||||
if (indexRelation == NULL)
|
||||
continue;
|
||||
|
||||
/* Detect conflict only for unique indexes */
|
||||
if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
|
||||
continue;
|
||||
|
||||
/* Don't support conflict detection for deferrable index */
|
||||
if (!indexRelation->rd_index->indimmediate)
|
||||
continue;
|
||||
|
||||
uniqueIndexes = lappend_oid(uniqueIndexes,
|
||||
RelationGetRelid(indexRelation));
|
||||
}
|
||||
|
||||
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add SQLSTATE error code to the current conflict report.
|
||||
*/
|
||||
static int
|
||||
errcode_apply_conflict(ConflictType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case CT_INSERT_EXISTS:
|
||||
case CT_UPDATE_EXISTS:
|
||||
return errcode(ERRCODE_UNIQUE_VIOLATION);
|
||||
case CT_UPDATE_DIFFER:
|
||||
case CT_UPDATE_MISSING:
|
||||
case CT_DELETE_DIFFER:
|
||||
case CT_DELETE_MISSING:
|
||||
return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
|
||||
}
|
||||
|
||||
Assert(false);
|
||||
return 0; /* silence compiler warning */
|
||||
}
|
||||
|
||||
/*
|
||||
* Add an errdetail() line showing conflict detail.
|
||||
*
|
||||
* The DETAIL line comprises of two parts:
|
||||
* 1. Explanation of the conflict type, including the origin and commit
|
||||
* timestamp of the existing local tuple.
|
||||
* 2. Display of conflicting key, existing local tuple, remote new tuple, and
|
||||
* replica identity columns, if any. The remote old tuple is excluded as its
|
||||
* information is covered in the replica identity columns.
|
||||
*/
|
||||
static int
|
||||
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
||||
ConflictType type, TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
||||
Oid indexoid, TransactionId localxmin,
|
||||
RepOriginId localorigin, TimestampTz localts)
|
||||
{
|
||||
StringInfoData err_detail;
|
||||
char *val_desc;
|
||||
char *origin_name;
|
||||
|
||||
initStringInfo(&err_detail);
|
||||
|
||||
/* First, construct a detailed message describing the type of conflict */
|
||||
switch (type)
|
||||
{
|
||||
case CT_INSERT_EXISTS:
|
||||
case CT_UPDATE_EXISTS:
|
||||
Assert(OidIsValid(indexoid));
|
||||
|
||||
if (localts)
|
||||
{
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
|
||||
get_rel_name(indexoid),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
|
||||
get_rel_name(indexoid), origin_name,
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
|
||||
/*
|
||||
* The origin that modified this row has been removed. This
|
||||
* can happen if the origin was created by a different apply
|
||||
* worker and its associated subscription and origin were
|
||||
* dropped after updating the row, or if the origin was
|
||||
* manually dropped by the user.
|
||||
*/
|
||||
else
|
||||
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
|
||||
get_rel_name(indexoid),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
}
|
||||
else
|
||||
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
|
||||
get_rel_name(indexoid), localxmin);
|
||||
|
||||
break;
|
||||
|
||||
case CT_UPDATE_DIFFER:
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
|
||||
origin_name, localxmin, timestamptz_to_str(localts));
|
||||
|
||||
/* The origin that modified this row has been removed. */
|
||||
else
|
||||
appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
|
||||
break;
|
||||
|
||||
case CT_UPDATE_MISSING:
|
||||
appendStringInfo(&err_detail, _("Could not find the row to be updated."));
|
||||
break;
|
||||
|
||||
case CT_DELETE_DIFFER:
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
|
||||
origin_name, localxmin, timestamptz_to_str(localts));
|
||||
|
||||
/* The origin that modified this row has been removed. */
|
||||
else
|
||||
appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
|
||||
break;
|
||||
|
||||
case CT_DELETE_MISSING:
|
||||
appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
|
||||
break;
|
||||
}
|
||||
|
||||
Assert(err_detail.len > 0);
|
||||
|
||||
val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
|
||||
localslot, remoteslot, indexoid);
|
||||
|
||||
/*
|
||||
* Next, append the key values, existing local tuple, remote tuple and
|
||||
* replica identity columns after the message.
|
||||
*/
|
||||
if (val_desc)
|
||||
appendStringInfo(&err_detail, "\n%s", val_desc);
|
||||
|
||||
return errdetail_internal("%s", err_detail.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function to build the additional details for conflicting key,
|
||||
* existing local tuple, remote tuple, and replica identity columns.
|
||||
*
|
||||
* If the return value is NULL, it indicates that the current user lacks
|
||||
* permissions to view the columns involved.
|
||||
*/
|
||||
static char *
|
||||
build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
|
||||
ConflictType type,
|
||||
TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot,
|
||||
TupleTableSlot *remoteslot,
|
||||
Oid indexoid)
|
||||
{
|
||||
Relation localrel = relinfo->ri_RelationDesc;
|
||||
Oid relid = RelationGetRelid(localrel);
|
||||
TupleDesc tupdesc = RelationGetDescr(localrel);
|
||||
StringInfoData tuple_value;
|
||||
char *desc = NULL;
|
||||
|
||||
Assert(searchslot || localslot || remoteslot);
|
||||
|
||||
initStringInfo(&tuple_value);
|
||||
|
||||
/*
|
||||
* Report the conflicting key values in the case of a unique constraint
|
||||
* violation.
|
||||
*/
|
||||
if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
|
||||
{
|
||||
Assert(OidIsValid(indexoid) && localslot);
|
||||
|
||||
desc = build_index_value_desc(estate, localrel, localslot, indexoid);
|
||||
|
||||
if (desc)
|
||||
appendStringInfo(&tuple_value, _("Key %s"), desc);
|
||||
}
|
||||
|
||||
if (localslot)
|
||||
{
|
||||
/*
|
||||
* The 'modifiedCols' only applies to the new tuple, hence we pass
|
||||
* NULL for the existing local tuple.
|
||||
*/
|
||||
desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
|
||||
NULL, 64);
|
||||
|
||||
if (desc)
|
||||
{
|
||||
if (tuple_value.len > 0)
|
||||
{
|
||||
appendStringInfoString(&tuple_value, "; ");
|
||||
appendStringInfo(&tuple_value, _("existing local tuple %s"),
|
||||
desc);
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(&tuple_value, _("Existing local tuple %s"),
|
||||
desc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (remoteslot)
|
||||
{
|
||||
Bitmapset *modifiedCols;
|
||||
|
||||
/*
|
||||
* Although logical replication doesn't maintain the bitmap for the
|
||||
* columns being inserted, we still use it to create 'modifiedCols'
|
||||
* for consistency with other calls to ExecBuildSlotValueDescription.
|
||||
*
|
||||
* Note that generated columns are formed locally on the subscriber.
|
||||
*/
|
||||
modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
|
||||
ExecGetUpdatedCols(relinfo, estate));
|
||||
desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
|
||||
modifiedCols, 64);
|
||||
|
||||
if (desc)
|
||||
{
|
||||
if (tuple_value.len > 0)
|
||||
{
|
||||
appendStringInfoString(&tuple_value, "; ");
|
||||
appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (searchslot)
|
||||
{
|
||||
/*
|
||||
* Note that while index other than replica identity may be used (see
|
||||
* IsIndexUsableForReplicaIdentityFull for details) to find the tuple
|
||||
* when applying update or delete, such an index scan may not result
|
||||
* in a unique tuple and we still compare the complete tuple in such
|
||||
* cases, thus such indexes are not used here.
|
||||
*/
|
||||
Oid replica_index = GetRelationIdentityOrPK(localrel);
|
||||
|
||||
Assert(type != CT_INSERT_EXISTS);
|
||||
|
||||
/*
|
||||
* If the table has a valid replica identity index, build the index
|
||||
* key value string. Otherwise, construct the full tuple value for
|
||||
* REPLICA IDENTITY FULL cases.
|
||||
*/
|
||||
if (OidIsValid(replica_index))
|
||||
desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
|
||||
else
|
||||
desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
|
||||
|
||||
if (desc)
|
||||
{
|
||||
if (tuple_value.len > 0)
|
||||
{
|
||||
appendStringInfoString(&tuple_value, "; ");
|
||||
appendStringInfo(&tuple_value, OidIsValid(replica_index)
|
||||
? _("replica identity %s")
|
||||
: _("replica identity full %s"), desc);
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(&tuple_value, OidIsValid(replica_index)
|
||||
? _("Replica identity %s")
|
||||
: _("Replica identity full %s"), desc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tuple_value.len == 0)
|
||||
return NULL;
|
||||
|
||||
appendStringInfoChar(&tuple_value, '.');
|
||||
return tuple_value.data;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper functions to construct a string describing the contents of an index
|
||||
* entry. See BuildIndexValueDescription for details.
|
||||
*
|
||||
* The caller must ensure that the index with the OID 'indexoid' is locked so
|
||||
* that we can fetch and display the conflicting key value.
|
||||
*/
|
||||
static char *
|
||||
build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
|
||||
Oid indexoid)
|
||||
{
|
||||
char *index_value;
|
||||
Relation indexDesc;
|
||||
Datum values[INDEX_MAX_KEYS];
|
||||
bool isnull[INDEX_MAX_KEYS];
|
||||
TupleTableSlot *tableslot = slot;
|
||||
|
||||
if (!tableslot)
|
||||
return NULL;
|
||||
|
||||
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
|
||||
|
||||
indexDesc = index_open(indexoid, NoLock);
|
||||
|
||||
/*
|
||||
* If the slot is a virtual slot, copy it into a heap tuple slot as
|
||||
* FormIndexDatum only works with heap tuple slots.
|
||||
*/
|
||||
if (TTS_IS_VIRTUAL(slot))
|
||||
{
|
||||
tableslot = table_slot_create(localrel, &estate->es_tupleTable);
|
||||
tableslot = ExecCopySlot(tableslot, slot);
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize ecxt_scantuple for potential use in FormIndexDatum when
|
||||
* index expressions are present.
|
||||
*/
|
||||
GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
|
||||
|
||||
/*
|
||||
* The values/nulls arrays passed to BuildIndexValueDescription should be
|
||||
* the results of FormIndexDatum, which are the "raw" input to the index
|
||||
* AM.
|
||||
*/
|
||||
FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
|
||||
|
||||
index_value = BuildIndexValueDescription(indexDesc, values, isnull);
|
||||
|
||||
index_close(indexDesc, NoLock);
|
||||
|
||||
return index_value;
|
||||
}
|
@ -2,6 +2,7 @@
|
||||
|
||||
backend_sources += files(
|
||||
'applyparallelworker.c',
|
||||
'conflict.c',
|
||||
'decode.c',
|
||||
'launcher.c',
|
||||
'logical.c',
|
||||
|
@ -167,6 +167,7 @@
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "replication/conflict.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/logicalproto.h"
|
||||
#include "replication/logicalrelation.h"
|
||||
@ -2481,7 +2482,8 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
|
||||
EState *estate = edata->estate;
|
||||
|
||||
/* We must open indexes here. */
|
||||
ExecOpenIndices(relinfo, false);
|
||||
ExecOpenIndices(relinfo, true);
|
||||
InitConflictIndexes(relinfo);
|
||||
|
||||
/* Do the insert. */
|
||||
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
|
||||
@ -2669,13 +2671,12 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
||||
MemoryContext oldctx;
|
||||
|
||||
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
|
||||
ExecOpenIndices(relinfo, false);
|
||||
ExecOpenIndices(relinfo, true);
|
||||
|
||||
found = FindReplTupleInLocalRel(edata, localrel,
|
||||
&relmapentry->remoterel,
|
||||
localindexoid,
|
||||
remoteslot, &localslot);
|
||||
ExecClearTuple(remoteslot);
|
||||
|
||||
/*
|
||||
* Tuple found.
|
||||
@ -2684,6 +2685,28 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
||||
*/
|
||||
if (found)
|
||||
{
|
||||
RepOriginId localorigin;
|
||||
TransactionId localxmin;
|
||||
TimestampTz localts;
|
||||
|
||||
/*
|
||||
* Report the conflict if the tuple was modified by a different
|
||||
* origin.
|
||||
*/
|
||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
||||
localorigin != replorigin_session_origin)
|
||||
{
|
||||
TupleTableSlot *newslot;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
newslot = table_slot_create(localrel, &estate->es_tupleTable);
|
||||
slot_store_data(newslot, relmapentry, newtup);
|
||||
|
||||
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_DIFFER,
|
||||
remoteslot, localslot, newslot,
|
||||
InvalidOid, localxmin, localorigin, localts);
|
||||
}
|
||||
|
||||
/* Process and store remote tuple in the slot */
|
||||
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
|
||||
@ -2691,6 +2714,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
||||
|
||||
EvalPlanQualSetSlot(&epqstate, remoteslot);
|
||||
|
||||
InitConflictIndexes(relinfo);
|
||||
|
||||
/* Do the actual update. */
|
||||
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
|
||||
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
|
||||
@ -2698,16 +2723,19 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
||||
}
|
||||
else
|
||||
{
|
||||
TupleTableSlot *newslot = localslot;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
slot_store_data(newslot, relmapentry, newtup);
|
||||
|
||||
/*
|
||||
* The tuple to be updated could not be found. Do nothing except for
|
||||
* emitting a log message.
|
||||
*
|
||||
* XXX should this be promoted to ereport(LOG) perhaps?
|
||||
*/
|
||||
elog(DEBUG1,
|
||||
"logical replication did not find row to be updated "
|
||||
"in replication target relation \"%s\"",
|
||||
RelationGetRelationName(localrel));
|
||||
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
|
||||
remoteslot, NULL, newslot,
|
||||
InvalidOid, InvalidTransactionId,
|
||||
InvalidRepOriginId, 0);
|
||||
}
|
||||
|
||||
/* Cleanup. */
|
||||
@ -2830,6 +2858,20 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
|
||||
/* If found delete it. */
|
||||
if (found)
|
||||
{
|
||||
RepOriginId localorigin;
|
||||
TransactionId localxmin;
|
||||
TimestampTz localts;
|
||||
|
||||
/*
|
||||
* Report the conflict if the tuple was modified by a different
|
||||
* origin.
|
||||
*/
|
||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
||||
localorigin != replorigin_session_origin)
|
||||
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_DIFFER,
|
||||
remoteslot, localslot, NULL,
|
||||
InvalidOid, localxmin, localorigin, localts);
|
||||
|
||||
EvalPlanQualSetSlot(&epqstate, localslot);
|
||||
|
||||
/* Do the actual delete. */
|
||||
@ -2841,13 +2883,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
|
||||
/*
|
||||
* The tuple to be deleted could not be found. Do nothing except for
|
||||
* emitting a log message.
|
||||
*
|
||||
* XXX should this be promoted to ereport(LOG) perhaps?
|
||||
*/
|
||||
elog(DEBUG1,
|
||||
"logical replication did not find row to be deleted "
|
||||
"in replication target relation \"%s\"",
|
||||
RelationGetRelationName(localrel));
|
||||
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
|
||||
remoteslot, NULL, NULL,
|
||||
InvalidOid, InvalidTransactionId,
|
||||
InvalidRepOriginId, 0);
|
||||
}
|
||||
|
||||
/* Cleanup. */
|
||||
@ -3015,6 +3055,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
Relation partrel_new;
|
||||
bool found;
|
||||
EPQState epqstate;
|
||||
RepOriginId localorigin;
|
||||
TransactionId localxmin;
|
||||
TimestampTz localts;
|
||||
|
||||
/* Get the matching local tuple from the partition. */
|
||||
found = FindReplTupleInLocalRel(edata, partrel,
|
||||
@ -3023,19 +3066,43 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
remoteslot_part, &localslot);
|
||||
if (!found)
|
||||
{
|
||||
TupleTableSlot *newslot = localslot;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
slot_store_data(newslot, part_entry, newtup);
|
||||
|
||||
/*
|
||||
* The tuple to be updated could not be found. Do nothing
|
||||
* except for emitting a log message.
|
||||
*
|
||||
* XXX should this be promoted to ereport(LOG) perhaps?
|
||||
*/
|
||||
elog(DEBUG1,
|
||||
"logical replication did not find row to be updated "
|
||||
"in replication target relation's partition \"%s\"",
|
||||
RelationGetRelationName(partrel));
|
||||
ReportApplyConflict(estate, partrelinfo,
|
||||
LOG, CT_UPDATE_MISSING,
|
||||
remoteslot_part, NULL, newslot,
|
||||
InvalidOid, InvalidTransactionId,
|
||||
InvalidRepOriginId, 0);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Report the conflict if the tuple was modified by a
|
||||
* different origin.
|
||||
*/
|
||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
||||
localorigin != replorigin_session_origin)
|
||||
{
|
||||
TupleTableSlot *newslot;
|
||||
|
||||
/* Store the new tuple for conflict reporting */
|
||||
newslot = table_slot_create(partrel, &estate->es_tupleTable);
|
||||
slot_store_data(newslot, part_entry, newtup);
|
||||
|
||||
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_DIFFER,
|
||||
remoteslot_part, localslot, newslot,
|
||||
InvalidOid, localxmin, localorigin,
|
||||
localts);
|
||||
}
|
||||
|
||||
/*
|
||||
* Apply the update to the local tuple, putting the result in
|
||||
* remoteslot_part.
|
||||
@ -3046,7 +3113,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
|
||||
ExecOpenIndices(partrelinfo, false);
|
||||
|
||||
/*
|
||||
* Does the updated tuple still satisfy the current
|
||||
@ -3063,6 +3129,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
* work already done above to find the local tuple in the
|
||||
* partition.
|
||||
*/
|
||||
ExecOpenIndices(partrelinfo, true);
|
||||
InitConflictIndexes(partrelinfo);
|
||||
|
||||
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
|
||||
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
|
||||
ACL_UPDATE);
|
||||
@ -3110,6 +3179,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
get_namespace_name(RelationGetNamespace(partrel_new)),
|
||||
RelationGetRelationName(partrel_new));
|
||||
|
||||
ExecOpenIndices(partrelinfo, false);
|
||||
|
||||
/* DELETE old tuple found in the old partition. */
|
||||
EvalPlanQualSetSlot(&epqstate, localslot);
|
||||
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
|
||||
|
Reference in New Issue
Block a user