mirror of
https://github.com/postgres/postgres.git
synced 2025-04-20 00:42:27 +03:00
Detect and Log multiple_unique_conflicts type conflict.
Introduce a new conflict type, multiple_unique_conflicts, to handle cases where an incoming row during logical replication violates multiple UNIQUE constraints. Previously, the apply worker detected and reported only the first encountered key conflict (insert_exists/update_exists), causing repeated failures as each constraint violation needs to be handled one by one making the process slow and error-prone. With this patch, the apply worker checks all unique constraints upfront once the first key conflict is detected and reports multiple_unique_conflicts if multiple violations exist. This allows users to resolve all conflicts at once by deleting all conflicting tuples rather than dealing with them individually or skipping the transaction. In the future, this will also allow us to specify different resolution handlers for such a conflict type. Add the stats for this conflict type in pg_stat_subscription_stats. Author: Nisha Moond <nisha.moond412@gmail.com> Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Discussion: https://postgr.es/m/CABdArM7FW-_dnthGkg2s0fy1HhUB8C3ELA0gZX1kkbs1ZZoV3Q@mail.gmail.com
This commit is contained in:
parent
35a92b7c25
commit
73eba5004a
@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
|
|||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
<varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
|
||||||
|
<term><literal>multiple_unique_conflicts</literal></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Inserting or updating a row violates multiple
|
||||||
|
<literal>NOT DEFERRABLE</literal> unique constraints. Note that to log
|
||||||
|
the origin and commit timestamp details of conflicting keys, ensure
|
||||||
|
that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
|
||||||
|
is enabled on the subscriber. In this case, an error will be raised until
|
||||||
|
the conflict is resolved manually.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
</variablelist>
|
</variablelist>
|
||||||
Note that there are other conflict scenarios, such as exclusion constraint
|
Note that there are other conflict scenarios, such as exclusion constraint
|
||||||
violations. Currently, we do not provide additional details for them in the
|
violations. Currently, we do not provide additional details for them in the
|
||||||
@ -1935,8 +1948,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
|
|||||||
<para>
|
<para>
|
||||||
The <literal>Key</literal> section includes the key values of the local
|
The <literal>Key</literal> section includes the key values of the local
|
||||||
tuple that violated a unique constraint for
|
tuple that violated a unique constraint for
|
||||||
<literal>insert_exists</literal> or <literal>update_exists</literal>
|
<literal>insert_exists</literal>, <literal>update_exists</literal> or
|
||||||
conflicts.
|
<literal>multiple_unique_conflicts</literal> conflicts.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
<listitem>
|
<listitem>
|
||||||
@ -1945,8 +1958,8 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
|
|||||||
tuple if its origin differs from the remote tuple for
|
tuple if its origin differs from the remote tuple for
|
||||||
<literal>update_origin_differs</literal> or <literal>delete_origin_differs</literal>
|
<literal>update_origin_differs</literal> or <literal>delete_origin_differs</literal>
|
||||||
conflicts, or if the key value conflicts with the remote tuple for
|
conflicts, or if the key value conflicts with the remote tuple for
|
||||||
<literal>insert_exists</literal> or <literal>update_exists</literal>
|
<literal>insert_exists</literal>, <literal>update_exists</literal> or
|
||||||
conflicts.
|
<literal>multiple_unique_conflicts</literal> conflicts.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
<listitem>
|
<listitem>
|
||||||
@ -1982,6 +1995,16 @@ DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
|
|||||||
The large column values are truncated to 64 bytes.
|
The large column values are truncated to 64 bytes.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Note that in case of <literal>multiple_unique_conflicts</literal> conflict,
|
||||||
|
multiple <replaceable class="parameter">detailed_explanation</replaceable>
|
||||||
|
and <replaceable class="parameter">detail_values</replaceable> lines
|
||||||
|
will be generated, each detailing the conflict information associated
|
||||||
|
with distinct unique
|
||||||
|
constraints.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
</itemizedlist>
|
</itemizedlist>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
|
|||||||
</para></entry>
|
</para></entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Number of times a row insertion or an updated row values violated multiple
|
||||||
|
<literal>NOT DEFERRABLE</literal> unique constraints during the
|
||||||
|
application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
|
||||||
|
for details about this conflict.
|
||||||
|
</para></entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
<row>
|
<row>
|
||||||
<entry role="catalog_table_entry"><para role="column_definition">
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
|
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
|
||||||
|
@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
|
|||||||
ss.confl_update_missing,
|
ss.confl_update_missing,
|
||||||
ss.confl_delete_origin_differs,
|
ss.confl_delete_origin_differs,
|
||||||
ss.confl_delete_missing,
|
ss.confl_delete_missing,
|
||||||
|
ss.confl_multiple_unique_conflicts,
|
||||||
ss.stats_reset
|
ss.stats_reset
|
||||||
FROM pg_subscription as s,
|
FROM pg_subscription as s,
|
||||||
pg_stat_get_subscription_stats(s.oid) as ss;
|
pg_stat_get_subscription_stats(s.oid) as ss;
|
||||||
|
@ -493,25 +493,33 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
|
|||||||
ConflictType type, List *recheckIndexes,
|
ConflictType type, List *recheckIndexes,
|
||||||
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
|
TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
|
||||||
{
|
{
|
||||||
/* Check all the unique indexes for a conflict */
|
List *conflicttuples = NIL;
|
||||||
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
|
|
||||||
{
|
|
||||||
TupleTableSlot *conflictslot;
|
TupleTableSlot *conflictslot;
|
||||||
|
|
||||||
|
/* Check all the unique indexes for conflicts */
|
||||||
|
foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
|
||||||
|
{
|
||||||
if (list_member_oid(recheckIndexes, uniqueidx) &&
|
if (list_member_oid(recheckIndexes, uniqueidx) &&
|
||||||
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
|
FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
|
||||||
&conflictslot))
|
&conflictslot))
|
||||||
{
|
{
|
||||||
RepOriginId origin;
|
ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
|
||||||
TimestampTz committs;
|
|
||||||
TransactionId xmin;
|
|
||||||
|
|
||||||
GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
|
conflicttuple->slot = conflictslot;
|
||||||
ReportApplyConflict(estate, resultRelInfo, ERROR, type,
|
conflicttuple->indexoid = uniqueidx;
|
||||||
searchslot, conflictslot, remoteslot,
|
|
||||||
uniqueidx, xmin, origin, committs);
|
GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
|
||||||
|
&conflicttuple->origin, &conflicttuple->ts);
|
||||||
|
|
||||||
|
conflicttuples = lappend(conflicttuples, conflicttuple);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Report the conflict, if found */
|
||||||
|
if (conflicttuples)
|
||||||
|
ReportApplyConflict(estate, resultRelInfo, ERROR,
|
||||||
|
list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
|
||||||
|
searchslot, remoteslot, conflicttuples);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
|
|||||||
[CT_UPDATE_EXISTS] = "update_exists",
|
[CT_UPDATE_EXISTS] = "update_exists",
|
||||||
[CT_UPDATE_MISSING] = "update_missing",
|
[CT_UPDATE_MISSING] = "update_missing",
|
||||||
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
|
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
|
||||||
[CT_DELETE_MISSING] = "delete_missing"
|
[CT_DELETE_MISSING] = "delete_missing",
|
||||||
|
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
|
||||||
};
|
};
|
||||||
|
|
||||||
static int errcode_apply_conflict(ConflictType type);
|
static int errcode_apply_conflict(ConflictType type);
|
||||||
static int errdetail_apply_conflict(EState *estate,
|
static void errdetail_apply_conflict(EState *estate,
|
||||||
ResultRelInfo *relinfo,
|
ResultRelInfo *relinfo,
|
||||||
ConflictType type,
|
ConflictType type,
|
||||||
TupleTableSlot *searchslot,
|
TupleTableSlot *searchslot,
|
||||||
@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
|
|||||||
TupleTableSlot *remoteslot,
|
TupleTableSlot *remoteslot,
|
||||||
Oid indexoid, TransactionId localxmin,
|
Oid indexoid, TransactionId localxmin,
|
||||||
RepOriginId localorigin,
|
RepOriginId localorigin,
|
||||||
TimestampTz localts);
|
TimestampTz localts, StringInfo err_msg);
|
||||||
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
|
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
|
||||||
ConflictType type,
|
ConflictType type,
|
||||||
TupleTableSlot *searchslot,
|
TupleTableSlot *searchslot,
|
||||||
@ -90,30 +91,33 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
|
|||||||
* 'searchslot' should contain the tuple used to search the local tuple to be
|
* 'searchslot' should contain the tuple used to search the local tuple to be
|
||||||
* updated or deleted.
|
* updated or deleted.
|
||||||
*
|
*
|
||||||
* 'localslot' should contain the existing local tuple, if any, that conflicts
|
|
||||||
* with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
|
|
||||||
* transaction information related to this existing local tuple.
|
|
||||||
*
|
|
||||||
* 'remoteslot' should contain the remote new tuple, if any.
|
* 'remoteslot' should contain the remote new tuple, if any.
|
||||||
*
|
*
|
||||||
* The 'indexoid' represents the OID of the unique index that triggered the
|
* conflicttuples is a list of local tuples that caused the conflict and the
|
||||||
* constraint violation error. We use this to report the key values for
|
* conflict related information. See ConflictTupleInfo.
|
||||||
* conflicting tuple.
|
|
||||||
*
|
*
|
||||||
* The caller must ensure that the index with the OID 'indexoid' is locked so
|
* The caller must ensure that all the indexes passed in ConflictTupleInfo are
|
||||||
* that we can fetch and display the conflicting key value.
|
* locked so that we can fetch and display the conflicting key values.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
|
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
|
||||||
ConflictType type, TupleTableSlot *searchslot,
|
ConflictType type, TupleTableSlot *searchslot,
|
||||||
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
TupleTableSlot *remoteslot, List *conflicttuples)
|
||||||
Oid indexoid, TransactionId localxmin,
|
|
||||||
RepOriginId localorigin, TimestampTz localts)
|
|
||||||
{
|
{
|
||||||
Relation localrel = relinfo->ri_RelationDesc;
|
Relation localrel = relinfo->ri_RelationDesc;
|
||||||
|
StringInfoData err_detail;
|
||||||
|
|
||||||
Assert(!OidIsValid(indexoid) ||
|
initStringInfo(&err_detail);
|
||||||
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
|
|
||||||
|
/* Form errdetail message by combining conflicting tuples information. */
|
||||||
|
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
|
||||||
|
errdetail_apply_conflict(estate, relinfo, type, searchslot,
|
||||||
|
conflicttuple->slot, remoteslot,
|
||||||
|
conflicttuple->indexoid,
|
||||||
|
conflicttuple->xmin,
|
||||||
|
conflicttuple->origin,
|
||||||
|
conflicttuple->ts,
|
||||||
|
&err_detail);
|
||||||
|
|
||||||
pgstat_report_subscription_conflict(MySubscription->oid, type);
|
pgstat_report_subscription_conflict(MySubscription->oid, type);
|
||||||
|
|
||||||
@ -123,9 +127,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
|
|||||||
get_namespace_name(RelationGetNamespace(localrel)),
|
get_namespace_name(RelationGetNamespace(localrel)),
|
||||||
RelationGetRelationName(localrel),
|
RelationGetRelationName(localrel),
|
||||||
ConflictTypeNames[type]),
|
ConflictTypeNames[type]),
|
||||||
errdetail_apply_conflict(estate, relinfo, type, searchslot,
|
errdetail_internal("%s", err_detail.data));
|
||||||
localslot, remoteslot, indexoid,
|
|
||||||
localxmin, localorigin, localts));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -169,6 +171,7 @@ errcode_apply_conflict(ConflictType type)
|
|||||||
{
|
{
|
||||||
case CT_INSERT_EXISTS:
|
case CT_INSERT_EXISTS:
|
||||||
case CT_UPDATE_EXISTS:
|
case CT_UPDATE_EXISTS:
|
||||||
|
case CT_MULTIPLE_UNIQUE_CONFLICTS:
|
||||||
return errcode(ERRCODE_UNIQUE_VIOLATION);
|
return errcode(ERRCODE_UNIQUE_VIOLATION);
|
||||||
case CT_UPDATE_ORIGIN_DIFFERS:
|
case CT_UPDATE_ORIGIN_DIFFERS:
|
||||||
case CT_UPDATE_MISSING:
|
case CT_UPDATE_MISSING:
|
||||||
@ -191,12 +194,13 @@ errcode_apply_conflict(ConflictType type)
|
|||||||
* replica identity columns, if any. The remote old tuple is excluded as its
|
* replica identity columns, if any. The remote old tuple is excluded as its
|
||||||
* information is covered in the replica identity columns.
|
* information is covered in the replica identity columns.
|
||||||
*/
|
*/
|
||||||
static int
|
static void
|
||||||
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
||||||
ConflictType type, TupleTableSlot *searchslot,
|
ConflictType type, TupleTableSlot *searchslot,
|
||||||
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
||||||
Oid indexoid, TransactionId localxmin,
|
Oid indexoid, TransactionId localxmin,
|
||||||
RepOriginId localorigin, TimestampTz localts)
|
RepOriginId localorigin, TimestampTz localts,
|
||||||
|
StringInfo err_msg)
|
||||||
{
|
{
|
||||||
StringInfoData err_detail;
|
StringInfoData err_detail;
|
||||||
char *val_desc;
|
char *val_desc;
|
||||||
@ -209,7 +213,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||||||
{
|
{
|
||||||
case CT_INSERT_EXISTS:
|
case CT_INSERT_EXISTS:
|
||||||
case CT_UPDATE_EXISTS:
|
case CT_UPDATE_EXISTS:
|
||||||
Assert(OidIsValid(indexoid));
|
case CT_MULTIPLE_UNIQUE_CONFLICTS:
|
||||||
|
Assert(OidIsValid(indexoid) &&
|
||||||
|
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
|
||||||
|
|
||||||
if (localts)
|
if (localts)
|
||||||
{
|
{
|
||||||
@ -291,7 +297,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||||||
if (val_desc)
|
if (val_desc)
|
||||||
appendStringInfo(&err_detail, "\n%s", val_desc);
|
appendStringInfo(&err_detail, "\n%s", val_desc);
|
||||||
|
|
||||||
return errdetail_internal("%s", err_detail.data);
|
/*
|
||||||
|
* Insert a blank line to visually separate the new detail line from the
|
||||||
|
* existing ones.
|
||||||
|
*/
|
||||||
|
if (err_msg->len > 0)
|
||||||
|
appendStringInfoChar(err_msg, '\n');
|
||||||
|
|
||||||
|
appendStringInfo(err_msg, "%s", err_detail.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -323,7 +336,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
|
|||||||
* Report the conflicting key values in the case of a unique constraint
|
* Report the conflicting key values in the case of a unique constraint
|
||||||
* violation.
|
* violation.
|
||||||
*/
|
*/
|
||||||
if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
|
if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
|
||||||
|
type == CT_MULTIPLE_UNIQUE_CONFLICTS)
|
||||||
{
|
{
|
||||||
Assert(OidIsValid(indexoid) && localslot);
|
Assert(OidIsValid(indexoid) && localslot);
|
||||||
|
|
||||||
|
@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
|||||||
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
|
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
|
||||||
Relation localrel = relinfo->ri_RelationDesc;
|
Relation localrel = relinfo->ri_RelationDesc;
|
||||||
EPQState epqstate;
|
EPQState epqstate;
|
||||||
TupleTableSlot *localslot;
|
TupleTableSlot *localslot = NULL;
|
||||||
|
ConflictTupleInfo conflicttuple = {0};
|
||||||
bool found;
|
bool found;
|
||||||
MemoryContext oldctx;
|
MemoryContext oldctx;
|
||||||
|
|
||||||
@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
|||||||
*/
|
*/
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
RepOriginId localorigin;
|
|
||||||
TransactionId localxmin;
|
|
||||||
TimestampTz localts;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Report the conflict if the tuple was modified by a different
|
* Report the conflict if the tuple was modified by a different
|
||||||
* origin.
|
* origin.
|
||||||
*/
|
*/
|
||||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
|
||||||
localorigin != replorigin_session_origin)
|
&conflicttuple.origin, &conflicttuple.ts) &&
|
||||||
|
conflicttuple.origin != replorigin_session_origin)
|
||||||
{
|
{
|
||||||
TupleTableSlot *newslot;
|
TupleTableSlot *newslot;
|
||||||
|
|
||||||
@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
|||||||
newslot = table_slot_create(localrel, &estate->es_tupleTable);
|
newslot = table_slot_create(localrel, &estate->es_tupleTable);
|
||||||
slot_store_data(newslot, relmapentry, newtup);
|
slot_store_data(newslot, relmapentry, newtup);
|
||||||
|
|
||||||
|
conflicttuple.slot = localslot;
|
||||||
|
|
||||||
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
|
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
|
||||||
remoteslot, localslot, newslot,
|
remoteslot, newslot,
|
||||||
InvalidOid, localxmin, localorigin, localts);
|
list_make1(&conflicttuple));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process and store remote tuple in the slot */
|
/* Process and store remote tuple in the slot */
|
||||||
@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
|
|||||||
* emitting a log message.
|
* emitting a log message.
|
||||||
*/
|
*/
|
||||||
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
|
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
|
||||||
remoteslot, NULL, newslot,
|
remoteslot, newslot, list_make1(&conflicttuple));
|
||||||
InvalidOid, InvalidTransactionId,
|
|
||||||
InvalidRepOriginId, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cleanup. */
|
/* Cleanup. */
|
||||||
@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
|
|||||||
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
|
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
|
||||||
EPQState epqstate;
|
EPQState epqstate;
|
||||||
TupleTableSlot *localslot;
|
TupleTableSlot *localslot;
|
||||||
|
ConflictTupleInfo conflicttuple = {0};
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
|
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
|
||||||
@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
|
|||||||
/* If found delete it. */
|
/* If found delete it. */
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
RepOriginId localorigin;
|
|
||||||
TransactionId localxmin;
|
|
||||||
TimestampTz localts;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Report the conflict if the tuple was modified by a different
|
* Report the conflict if the tuple was modified by a different
|
||||||
* origin.
|
* origin.
|
||||||
*/
|
*/
|
||||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
|
||||||
localorigin != replorigin_session_origin)
|
&conflicttuple.origin, &conflicttuple.ts) &&
|
||||||
|
conflicttuple.origin != replorigin_session_origin)
|
||||||
|
{
|
||||||
|
conflicttuple.slot = localslot;
|
||||||
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
|
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
|
||||||
remoteslot, localslot, NULL,
|
remoteslot, NULL,
|
||||||
InvalidOid, localxmin, localorigin, localts);
|
list_make1(&conflicttuple));
|
||||||
|
}
|
||||||
|
|
||||||
EvalPlanQualSetSlot(&epqstate, localslot);
|
EvalPlanQualSetSlot(&epqstate, localslot);
|
||||||
|
|
||||||
@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
|
|||||||
* emitting a log message.
|
* emitting a log message.
|
||||||
*/
|
*/
|
||||||
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
|
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
|
||||||
remoteslot, NULL, NULL,
|
remoteslot, NULL, list_make1(&conflicttuple));
|
||||||
InvalidOid, InvalidTransactionId,
|
|
||||||
InvalidRepOriginId, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cleanup. */
|
/* Cleanup. */
|
||||||
@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
Relation partrel_new;
|
Relation partrel_new;
|
||||||
bool found;
|
bool found;
|
||||||
EPQState epqstate;
|
EPQState epqstate;
|
||||||
RepOriginId localorigin;
|
ConflictTupleInfo conflicttuple = {0};
|
||||||
TransactionId localxmin;
|
|
||||||
TimestampTz localts;
|
|
||||||
|
|
||||||
/* Get the matching local tuple from the partition. */
|
/* Get the matching local tuple from the partition. */
|
||||||
found = FindReplTupleInLocalRel(edata, partrel,
|
found = FindReplTupleInLocalRel(edata, partrel,
|
||||||
@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
* The tuple to be updated could not be found. Do nothing
|
* The tuple to be updated could not be found. Do nothing
|
||||||
* except for emitting a log message.
|
* except for emitting a log message.
|
||||||
*/
|
*/
|
||||||
ReportApplyConflict(estate, partrelinfo,
|
ReportApplyConflict(estate, partrelinfo, LOG,
|
||||||
LOG, CT_UPDATE_MISSING,
|
CT_UPDATE_MISSING, remoteslot_part,
|
||||||
remoteslot_part, NULL, newslot,
|
newslot, list_make1(&conflicttuple));
|
||||||
InvalidOid, InvalidTransactionId,
|
|
||||||
InvalidRepOriginId, 0);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
* Report the conflict if the tuple was modified by a
|
* Report the conflict if the tuple was modified by a
|
||||||
* different origin.
|
* different origin.
|
||||||
*/
|
*/
|
||||||
if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
|
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
|
||||||
localorigin != replorigin_session_origin)
|
&conflicttuple.origin,
|
||||||
|
&conflicttuple.ts) &&
|
||||||
|
conflicttuple.origin != replorigin_session_origin)
|
||||||
{
|
{
|
||||||
TupleTableSlot *newslot;
|
TupleTableSlot *newslot;
|
||||||
|
|
||||||
@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
|
|||||||
newslot = table_slot_create(partrel, &estate->es_tupleTable);
|
newslot = table_slot_create(partrel, &estate->es_tupleTable);
|
||||||
slot_store_data(newslot, part_entry, newtup);
|
slot_store_data(newslot, part_entry, newtup);
|
||||||
|
|
||||||
|
conflicttuple.slot = localslot;
|
||||||
|
|
||||||
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
|
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
|
||||||
remoteslot_part, localslot, newslot,
|
remoteslot_part, newslot,
|
||||||
InvalidOid, localxmin, localorigin,
|
list_make1(&conflicttuple));
|
||||||
localts);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
|||||||
Datum
|
Datum
|
||||||
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
|
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
|
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
|
||||||
Oid subid = PG_GETARG_OID(0);
|
Oid subid = PG_GETARG_OID(0);
|
||||||
TupleDesc tupdesc;
|
TupleDesc tupdesc;
|
||||||
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
|
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
|
||||||
@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
|
|||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
|
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
|
||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
|
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
|
||||||
|
INT8OID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
|
||||||
TIMESTAMPTZOID, -1, 0);
|
TIMESTAMPTZOID, -1, 0);
|
||||||
BlessTupleDesc(tupdesc);
|
BlessTupleDesc(tupdesc);
|
||||||
|
|
||||||
|
@ -57,6 +57,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* yyyymmddN */
|
/* yyyymmddN */
|
||||||
#define CATALOG_VERSION_NO 202503131
|
#define CATALOG_VERSION_NO 202503241
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -5647,9 +5647,9 @@
|
|||||||
{ oid => '6231', descr => 'statistics: information about subscription stats',
|
{ oid => '6231', descr => 'statistics: information about subscription stats',
|
||||||
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
|
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
|
||||||
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
|
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
|
||||||
proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
|
proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
|
||||||
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
|
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
|
||||||
proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
|
proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
|
||||||
prosrc => 'pg_stat_get_subscription_stats' },
|
prosrc => 'pg_stat_get_subscription_stats' },
|
||||||
{ oid => '6118', descr => 'statistics: information about subscription',
|
{ oid => '6118', descr => 'statistics: information about subscription',
|
||||||
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
|
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
|
||||||
|
@ -41,6 +41,9 @@ typedef enum
|
|||||||
/* The row to be deleted is missing */
|
/* The row to be deleted is missing */
|
||||||
CT_DELETE_MISSING,
|
CT_DELETE_MISSING,
|
||||||
|
|
||||||
|
/* The row to be inserted/updated violates multiple unique constraint */
|
||||||
|
CT_MULTIPLE_UNIQUE_CONFLICTS,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Other conflicts, such as exclusion constraint violations, involve more
|
* Other conflicts, such as exclusion constraint violations, involve more
|
||||||
* complex rules than simple equality checks. These conflicts are left for
|
* complex rules than simple equality checks. These conflicts are left for
|
||||||
@ -48,7 +51,23 @@ typedef enum
|
|||||||
*/
|
*/
|
||||||
} ConflictType;
|
} ConflictType;
|
||||||
|
|
||||||
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
|
#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Information for the existing local tuple that caused the conflict.
|
||||||
|
*/
|
||||||
|
typedef struct ConflictTupleInfo
|
||||||
|
{
|
||||||
|
TupleTableSlot *slot; /* tuple slot holding the conflicting local
|
||||||
|
* tuple */
|
||||||
|
Oid indexoid; /* OID of the index where the conflict
|
||||||
|
* occurred */
|
||||||
|
TransactionId xmin; /* transaction ID of the modification causing
|
||||||
|
* the conflict */
|
||||||
|
RepOriginId origin; /* origin identifier of the modification */
|
||||||
|
TimestampTz ts; /* timestamp of when the modification on the
|
||||||
|
* conflicting local tuple occurred */
|
||||||
|
} ConflictTupleInfo;
|
||||||
|
|
||||||
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
|
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
|
||||||
TransactionId *xmin,
|
TransactionId *xmin,
|
||||||
@ -57,10 +76,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
|
|||||||
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
|
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
|
||||||
int elevel, ConflictType type,
|
int elevel, ConflictType type,
|
||||||
TupleTableSlot *searchslot,
|
TupleTableSlot *searchslot,
|
||||||
TupleTableSlot *localslot,
|
|
||||||
TupleTableSlot *remoteslot,
|
TupleTableSlot *remoteslot,
|
||||||
Oid indexoid, TransactionId localxmin,
|
List *conflicttuples);
|
||||||
RepOriginId localorigin, TimestampTz localts);
|
|
||||||
extern void InitConflictIndexes(ResultRelInfo *relInfo);
|
extern void InitConflictIndexes(ResultRelInfo *relInfo);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
|
|||||||
ss.confl_update_missing,
|
ss.confl_update_missing,
|
||||||
ss.confl_delete_origin_differs,
|
ss.confl_delete_origin_differs,
|
||||||
ss.confl_delete_missing,
|
ss.confl_delete_missing,
|
||||||
|
ss.confl_multiple_unique_conflicts,
|
||||||
ss.stats_reset
|
ss.stats_reset
|
||||||
FROM pg_subscription s,
|
FROM pg_subscription s,
|
||||||
LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
|
LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
|
||||||
pg_stat_sys_indexes| SELECT relid,
|
pg_stat_sys_indexes| SELECT relid,
|
||||||
indexrelid,
|
indexrelid,
|
||||||
schemaname,
|
schemaname,
|
||||||
|
@ -41,6 +41,7 @@ tests += {
|
|||||||
't/032_subscribe_use_index.pl',
|
't/032_subscribe_use_index.pl',
|
||||||
't/033_run_as_table_owner.pl',
|
't/033_run_as_table_owner.pl',
|
||||||
't/034_temporal.pl',
|
't/034_temporal.pl',
|
||||||
|
't/035_conflicts.pl',
|
||||||
't/100_bugs.pl',
|
't/100_bugs.pl',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
|
113
src/test/subscription/t/035_conflicts.pl
Normal file
113
src/test/subscription/t/035_conflicts.pl
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
# Copyright (c) 2025, PostgreSQL Global Development Group
|
||||||
|
|
||||||
|
# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
|
||||||
|
use strict;
|
||||||
|
use warnings FATAL => 'all';
|
||||||
|
use PostgreSQL::Test::Cluster;
|
||||||
|
use PostgreSQL::Test::Utils;
|
||||||
|
use Test::More;
|
||||||
|
|
||||||
|
###############################
|
||||||
|
# Setup
|
||||||
|
###############################
|
||||||
|
|
||||||
|
# Create a publisher node
|
||||||
|
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
|
||||||
|
$node_publisher->init(allows_streaming => 'logical');
|
||||||
|
$node_publisher->start;
|
||||||
|
|
||||||
|
# Create a subscriber node
|
||||||
|
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
|
||||||
|
$node_subscriber->init;
|
||||||
|
$node_subscriber->start;
|
||||||
|
|
||||||
|
# Create a table on publisher
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
|
||||||
|
|
||||||
|
# Create same table on subscriber
|
||||||
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
"CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);");
|
||||||
|
|
||||||
|
# Setup logical replication
|
||||||
|
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
|
||||||
|
|
||||||
|
# Create the subscription
|
||||||
|
my $appname = 'sub_tab';
|
||||||
|
$node_subscriber->safe_psql(
|
||||||
|
'postgres',
|
||||||
|
"CREATE SUBSCRIPTION sub_tab
|
||||||
|
CONNECTION '$publisher_connstr application_name=$appname'
|
||||||
|
PUBLICATION pub_tab;");
|
||||||
|
|
||||||
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# INSERT data on Pub and Sub
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
# Insert data in the publisher table
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO conf_tab VALUES (1,1,1);");
|
||||||
|
|
||||||
|
# Insert data in the subscriber table
|
||||||
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
"INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Test multiple_unique_conflicts due to INSERT
|
||||||
|
##################################################
|
||||||
|
my $log_offset = -s $node_subscriber->logfile;
|
||||||
|
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO conf_tab VALUES (2,3,4);");
|
||||||
|
|
||||||
|
# Confirm that this causes an error on the subscriber
|
||||||
|
$node_subscriber->wait_for_log(
|
||||||
|
qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
|
||||||
|
.*Key already exists in unique index \"conf_tab_pkey\".*
|
||||||
|
.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).*
|
||||||
|
.*Key already exists in unique index \"conf_tab_b_key\".*
|
||||||
|
.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).*
|
||||||
|
.*Key already exists in unique index \"conf_tab_c_key\".*
|
||||||
|
.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
|
||||||
|
$log_offset);
|
||||||
|
|
||||||
|
pass('multiple_unique_conflicts detected during update');
|
||||||
|
|
||||||
|
# Truncate table to get rid of the error
|
||||||
|
$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Test multiple_unique_conflicts due to UPDATE
|
||||||
|
##################################################
|
||||||
|
$log_offset = -s $node_subscriber->logfile;
|
||||||
|
|
||||||
|
# Insert data in the publisher table
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO conf_tab VALUES (5,5,5);");
|
||||||
|
|
||||||
|
# Insert data in the subscriber table
|
||||||
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
"INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
|
||||||
|
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
|
||||||
|
|
||||||
|
# Confirm that this causes an error on the subscriber
|
||||||
|
$node_subscriber->wait_for_log(
|
||||||
|
qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
|
||||||
|
.*Key already exists in unique index \"conf_tab_pkey\".*
|
||||||
|
.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\).*
|
||||||
|
.*Key already exists in unique index \"conf_tab_b_key\".*
|
||||||
|
.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\).*
|
||||||
|
.*Key already exists in unique index \"conf_tab_c_key\".*
|
||||||
|
.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
|
||||||
|
$log_offset);
|
||||||
|
|
||||||
|
pass('multiple_unique_conflicts detected during insert');
|
||||||
|
|
||||||
|
done_testing();
|
@ -480,6 +480,7 @@ ConditionVariableMinimallyPadded
|
|||||||
ConditionalStack
|
ConditionalStack
|
||||||
ConfigData
|
ConfigData
|
||||||
ConfigVariable
|
ConfigVariable
|
||||||
|
ConflictTupleInfo
|
||||||
ConflictType
|
ConflictType
|
||||||
ConnCacheEntry
|
ConnCacheEntry
|
||||||
ConnCacheKey
|
ConnCacheKey
|
||||||
|
Loading…
x
Reference in New Issue
Block a user