1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-21 12:05:57 +03:00

Fix reporting of constraint violations for table partitioning.

After a tuple is routed to a partition, it has been converted from the
root table's row type to the partition's row type.  ExecConstraints
needs to report the failure using the original tuple and the parent's
tuple descriptor rather than the ones for the selected partition.

Amit Langote
This commit is contained in:
Robert Haas 2017-01-04 14:36:34 -05:00
parent 3e353a7bc2
commit f1b4c771ea
8 changed files with 86 additions and 26 deletions

View File

@ -2426,6 +2426,7 @@ CopyFrom(CopyState cstate)
cstate->rel, cstate->rel,
1, /* dummy rangetable index */ 1, /* dummy rangetable index */
true, /* do load partition check expression */ true, /* do load partition check expression */
NULL,
0); 0);
ExecOpenIndices(resultRelInfo, false); ExecOpenIndices(resultRelInfo, false);
@ -2491,7 +2492,7 @@ CopyFrom(CopyState cstate)
for (;;) for (;;)
{ {
TupleTableSlot *slot, TupleTableSlot *slot,
*oldslot = NULL; *oldslot;
bool skip_tuple; bool skip_tuple;
Oid loaded_oid = InvalidOid; Oid loaded_oid = InvalidOid;
@ -2533,6 +2534,7 @@ CopyFrom(CopyState cstate)
ExecStoreTuple(tuple, slot, InvalidBuffer, false); ExecStoreTuple(tuple, slot, InvalidBuffer, false);
/* Determine the partition to heap_insert the tuple into */ /* Determine the partition to heap_insert the tuple into */
oldslot = slot;
if (cstate->partition_dispatch_info) if (cstate->partition_dispatch_info)
{ {
int leaf_part_index; int leaf_part_index;
@ -2587,7 +2589,6 @@ CopyFrom(CopyState cstate)
* point on. Use a dedicated slot from this point on until * point on. Use a dedicated slot from this point on until
* we're finished dealing with the partition. * we're finished dealing with the partition.
*/ */
oldslot = slot;
slot = cstate->partition_tuple_slot; slot = cstate->partition_tuple_slot;
Assert(slot != NULL); Assert(slot != NULL);
ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
@ -2624,7 +2625,7 @@ CopyFrom(CopyState cstate)
/* Check the constraints of the tuple */ /* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr || if (cstate->rel->rd_att->constr ||
resultRelInfo->ri_PartitionCheck) resultRelInfo->ri_PartitionCheck)
ExecConstraints(resultRelInfo, slot, estate); ExecConstraints(resultRelInfo, slot, oldslot, estate);
if (useHeapMultiInsert) if (useHeapMultiInsert)
{ {
@ -2686,10 +2687,6 @@ CopyFrom(CopyState cstate)
{ {
resultRelInfo = saved_resultRelInfo; resultRelInfo = saved_resultRelInfo;
estate->es_result_relation_info = resultRelInfo; estate->es_result_relation_info = resultRelInfo;
/* Switch back to the slot corresponding to the root table */
Assert(oldslot != NULL);
slot = oldslot;
} }
} }
} }

View File

@ -1324,6 +1324,7 @@ ExecuteTruncate(TruncateStmt *stmt)
rel, rel,
0, /* dummy rangetable index */ 0, /* dummy rangetable index */
false, false,
NULL,
0); 0);
resultRelInfo++; resultRelInfo++;
} }

View File

@ -828,6 +828,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
resultRelation, resultRelation,
resultRelationIndex, resultRelationIndex,
true, true,
NULL,
estate->es_instrument); estate->es_instrument);
resultRelInfo++; resultRelInfo++;
} }
@ -1218,6 +1219,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
Relation resultRelationDesc, Relation resultRelationDesc,
Index resultRelationIndex, Index resultRelationIndex,
bool load_partition_check, bool load_partition_check,
Relation partition_root,
int instrument_options) int instrument_options)
{ {
MemSet(resultRelInfo, 0, sizeof(ResultRelInfo)); MemSet(resultRelInfo, 0, sizeof(ResultRelInfo));
@ -1259,6 +1261,11 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_PartitionCheck = resultRelInfo->ri_PartitionCheck =
RelationGetPartitionQual(resultRelationDesc, RelationGetPartitionQual(resultRelationDesc,
true); true);
/*
* The following gets set to NULL unless we are initializing leaf
* partitions for tuple-routing.
*/
resultRelInfo->ri_PartitionRoot = partition_root;
} }
/* /*
@ -1322,6 +1329,7 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
rel, rel,
0, /* dummy rangetable index */ 0, /* dummy rangetable index */
true, true,
NULL,
estate->es_instrument); estate->es_instrument);
estate->es_trig_target_relations = estate->es_trig_target_relations =
lappend(estate->es_trig_target_relations, rInfo); lappend(estate->es_trig_target_relations, rInfo);
@ -1743,9 +1751,21 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
return ExecQual(resultRelInfo->ri_PartitionCheckExpr, econtext, true); return ExecQual(resultRelInfo->ri_PartitionCheckExpr, econtext, true);
} }
/*
* ExecConstraints - check constraints of the tuple in 'slot'
*
* This checks the traditional NOT NULL and check constraints, as well as
* the partition constraint, if any.
*
* Note: 'slot' contains the tuple to check the constraints of, which may
* have been converted from the original input tuple after tuple routing,
* while 'orig_slot' contains the original tuple to be shown in the message,
* if an error occurs.
*/
void void
ExecConstraints(ResultRelInfo *resultRelInfo, ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate) TupleTableSlot *slot, TupleTableSlot *orig_slot,
EState *estate)
{ {
Relation rel = resultRelInfo->ri_RelationDesc; Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel); TupleDesc tupdesc = RelationGetDescr(rel);
@ -1767,12 +1787,24 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
slot_attisnull(slot, attrChk)) slot_attisnull(slot, attrChk))
{ {
char *val_desc; char *val_desc;
Relation orig_rel = rel;
TupleDesc orig_tupdesc = tupdesc;
/*
* choose the correct relation to build val_desc from the
* tuple contained in orig_slot
*/
if (resultRelInfo->ri_PartitionRoot)
{
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
}
insertedCols = GetInsertedColumns(resultRelInfo, estate); insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate); updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols); modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel), val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot, orig_slot,
tupdesc, tupdesc,
modifiedCols, modifiedCols,
64); 64);
@ -1780,9 +1812,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_NOT_NULL_VIOLATION), (errcode(ERRCODE_NOT_NULL_VIOLATION),
errmsg("null value in column \"%s\" violates not-null constraint", errmsg("null value in column \"%s\" violates not-null constraint",
NameStr(tupdesc->attrs[attrChk - 1]->attname)), NameStr(orig_tupdesc->attrs[attrChk - 1]->attname)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0, val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
errtablecol(rel, attrChk))); errtablecol(orig_rel, attrChk)));
} }
} }
} }
@ -1794,21 +1826,29 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL) if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL)
{ {
char *val_desc; char *val_desc;
Relation orig_rel = rel;
/* See the comment above. */
if (resultRelInfo->ri_PartitionRoot)
{
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
}
insertedCols = GetInsertedColumns(resultRelInfo, estate); insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate); updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols); modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel), val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot, orig_slot,
tupdesc, tupdesc,
modifiedCols, modifiedCols,
64); 64);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION), (errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates check constraint \"%s\"", errmsg("new row for relation \"%s\" violates check constraint \"%s\"",
RelationGetRelationName(rel), failed), RelationGetRelationName(orig_rel), failed),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0, val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
errtableconstraint(rel, failed))); errtableconstraint(orig_rel, failed)));
} }
} }
@ -1816,19 +1856,27 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
!ExecPartitionCheck(resultRelInfo, slot, estate)) !ExecPartitionCheck(resultRelInfo, slot, estate))
{ {
char *val_desc; char *val_desc;
Relation orig_rel = rel;
/* See the comment above. */
if (resultRelInfo->ri_PartitionRoot)
{
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
}
insertedCols = GetInsertedColumns(resultRelInfo, estate); insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate); updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols); modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel), val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot, orig_slot,
tupdesc, tupdesc,
modifiedCols, modifiedCols,
64); 64);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION), (errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint", errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(rel)), RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0)); val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
} }
} }
@ -3086,6 +3134,7 @@ ExecSetupPartitionTupleRouting(Relation rel,
partrel, partrel,
1, /* dummy */ 1, /* dummy */
false, false,
rel,
0); 0);
/* /*

View File

@ -262,7 +262,7 @@ ExecInsert(ModifyTableState *mtstate,
Relation resultRelationDesc; Relation resultRelationDesc;
Oid newId; Oid newId;
List *recheckIndexes = NIL; List *recheckIndexes = NIL;
TupleTableSlot *oldslot = NULL; TupleTableSlot *oldslot = slot;
/* /*
* get the heap tuple out of the tuple table slot, making sure we have a * get the heap tuple out of the tuple table slot, making sure we have a
@ -328,7 +328,6 @@ ExecInsert(ModifyTableState *mtstate,
* point on, until we're finished dealing with the partition. * point on, until we're finished dealing with the partition.
* Use the dedicated slot for that. * Use the dedicated slot for that.
*/ */
oldslot = slot;
slot = mtstate->mt_partition_tuple_slot; slot = mtstate->mt_partition_tuple_slot;
Assert(slot != NULL); Assert(slot != NULL);
ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
@ -434,7 +433,7 @@ ExecInsert(ModifyTableState *mtstate,
* Check the constraints of the tuple * Check the constraints of the tuple
*/ */
if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck) if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
ExecConstraints(resultRelInfo, slot, estate); ExecConstraints(resultRelInfo, slot, oldslot, estate);
if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0) if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
{ {
@ -579,10 +578,6 @@ ExecInsert(ModifyTableState *mtstate,
{ {
resultRelInfo = saved_resultRelInfo; resultRelInfo = saved_resultRelInfo;
estate->es_result_relation_info = resultRelInfo; estate->es_result_relation_info = resultRelInfo;
/* Switch back to the slot corresponding to the root table */
Assert(oldslot != NULL);
slot = oldslot;
} }
/* /*
@ -994,10 +989,12 @@ lreplace:;
resultRelInfo, slot, estate); resultRelInfo, slot, estate);
/* /*
* Check the constraints of the tuple * Check the constraints of the tuple. Note that we pass the same
* slot for the orig_slot argument, because unlike ExecInsert(), no
* tuple-routing is performed here, hence the slot remains unchanged.
*/ */
if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck) if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
ExecConstraints(resultRelInfo, slot, estate); ExecConstraints(resultRelInfo, slot, slot, estate);
/* /*
* replace the heap tuple * replace the heap tuple

View File

@ -190,11 +190,13 @@ extern void InitResultRelInfo(ResultRelInfo *resultRelInfo,
Relation resultRelationDesc, Relation resultRelationDesc,
Index resultRelationIndex, Index resultRelationIndex,
bool load_partition_check, bool load_partition_check,
Relation partition_root,
int instrument_options); int instrument_options);
extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid); extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids); extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
extern void ExecConstraints(ResultRelInfo *resultRelInfo, extern void ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate); TupleTableSlot *slot, TupleTableSlot *orig_slot,
EState *estate);
extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo, extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate); TupleTableSlot *slot, EState *estate);
extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo); extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);

View File

@ -349,6 +349,7 @@ typedef struct ResultRelInfo
List *ri_onConflictSetWhere; List *ri_onConflictSetWhere;
List *ri_PartitionCheck; List *ri_PartitionCheck;
List *ri_PartitionCheckExpr; List *ri_PartitionCheckExpr;
Relation ri_PartitionRoot;
} ResultRelInfo; } ResultRelInfo;
/* ---------------- /* ----------------

View File

@ -333,5 +333,12 @@ select tableoid::regclass, * from p;
p11 | 1 | 2 p11 | 1 | 2
(1 row) (1 row)
truncate p;
alter table p add constraint check_b check (b = 3);
-- check that correct input row is shown when constraint check_b fails on p11
-- after "(1, 2)" is routed to it
insert into p values (1, 2);
ERROR: new row for relation "p11" violates check constraint "check_b"
DETAIL: Failing row contains (1, 2).
-- cleanup -- cleanup
drop table p, p1, p11; drop table p, p1, p11;

View File

@ -195,5 +195,11 @@ alter table p attach partition p1 for values from (1, 2) to (1, 10);
insert into p values (1, 2); insert into p values (1, 2);
select tableoid::regclass, * from p; select tableoid::regclass, * from p;
truncate p;
alter table p add constraint check_b check (b = 3);
-- check that correct input row is shown when constraint check_b fails on p11
-- after "(1, 2)" is routed to it
insert into p values (1, 2);
-- cleanup -- cleanup
drop table p, p1, p11; drop table p, p1, p11;