diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 0fcbc660b31..99b95bbdb43 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -379,9 +379,6 @@ WITH ( MODULUS numeric_literal, REM
Partitioned tables do not support EXCLUDE constraints;
however, you can define these constraints on individual partitions.
- Also, while it's possible to define PRIMARY KEY
- constraints on partitioned tables, creating foreign keys that
- reference a partitioned table is not yet supported.
@@ -1028,9 +1025,7 @@ WITH ( MODULUS numeric_literal, REM
addition of a foreign key constraint requires a
SHARE ROW EXCLUSIVE lock on the referenced table.
Note that foreign key constraints cannot be defined between temporary
- tables and permanent tables. Also note that while it is possible to
- define a foreign key on a partitioned table, it is not possible to
- declare a foreign key that references a partitioned table.
+ tables and permanent tables.
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 654179297cf..978b6bec44a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -415,10 +415,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
-static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
- List **cloned);
-static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
- Relation partRel, List *clone, List **cloned);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+ Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+ Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok, LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+ Relation partitionRel);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel);
+static void CloneFkReferencing(List **wqueue, Relation parentRel,
+ Relation partRel);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+ Constraint *fkconstraint, Oid constraintOid,
+ Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+ Constraint *fkconstraint, Oid constraintOid,
+ Oid indexOid);
+static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+ Oid partRelid,
+ Oid parentConstrOid, int numfks,
+ AttrNumber *mapped_conkey, AttrNumber *confkey,
+ Oid *conpfeqop);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
@@ -501,6 +523,8 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
static void update_relispartition(Relation classRel, Oid relationId,
bool newval);
+static List *GetParentedForeignKeyRefs(Relation partition);
+static void ATDetachCheckNoForeignKeyRefs(Relation partition);
/* ----------------------------------------------------------------
@@ -1083,7 +1107,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
* And foreign keys too. Note that because we're freshly creating the
* table, there is no need to verify these new constraints.
*/
- CloneForeignKeyConstraints(parentId, relationId, NULL);
+ CloneForeignKeyConstraints(NULL, parent, rel);
table_close(parent, NoLock);
}
@@ -3563,7 +3587,8 @@ AlterTableGetLockLevel(List *cmds)
/*
* Removing constraints can affect SELECTs that have been
- * optimised assuming the constraint holds true.
+ * optimised assuming the constraint holds true. See also
+ * CloneFkReferenced.
*/
case AT_DropConstraint: /* as DROP INDEX */
case AT_DropNotNull: /* may change some SQL plans */
@@ -7224,9 +7249,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
case CONSTR_FOREIGN:
/*
- * Note that we currently never recurse for FK constraints, so the
- * "recurse" flag is silently ignored.
- *
* Assign or validate constraint name
*/
if (newConstraint->conname)
@@ -7444,6 +7466,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Subroutine for ATExecAddConstraint. Must already hold exclusive
* lock on the rel, and have done appropriate validity checks for it.
* We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof). We also need action triggers on each leaf partition on the
+ * referenced side, and check triggers on each leaf partition on the
+ * referencing side.
*/
static ObjectAddress
ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7459,12 +7488,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pfeqoperators[INDEX_MAX_KEYS];
Oid ppeqoperators[INDEX_MAX_KEYS];
Oid ffeqoperators[INDEX_MAX_KEYS];
- bool connoinherit;
int i;
int numfks,
numpks;
Oid indexOid;
- Oid constrOid;
bool old_check_ok;
ObjectAddress address;
ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7482,12 +7509,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Validity checks (permission checks wait till we have the column
* numbers)
*/
- if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot reference partitioned table \"%s\"",
- RelationGetRelationName(pkrel))));
-
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (!recurse)
@@ -7505,7 +7526,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
errdetail("This feature is not yet supported on partitioned tables.")));
}
- if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+ if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+ pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("referenced relation \"%s\" is not a table",
@@ -7741,8 +7763,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("foreign key constraint \"%s\" "
- "cannot be implemented",
+ errmsg("foreign key constraint \"%s\" cannot be implemented",
fkconstraint->conname),
errdetail("Key columns \"%s\" and \"%s\" "
"are of incompatible types: %s and %s.",
@@ -7830,15 +7851,126 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
- * FKs always inherit for partitioned tables, and never for legacy
- * inheritance.
+ * Create all the constraint and trigger objects, recursing to partitions
+ * as necessary. First handle the referenced side.
*/
- connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+ address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+ indexOid,
+ InvalidOid, /* no parent constraint */
+ numfks,
+ pkattnum,
+ fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok);
+
+ /* Now handle the referencing side. */
+ addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+ indexOid,
+ address.objectId,
+ numfks,
+ pkattnum,
+ fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok,
+ lockmode);
+
+ /*
+ * Done. Close pk table, but keep lock until we've committed.
+ */
+ table_close(pkrel, NoLock);
+
+ return address;
+}
+
+/*
+ * addFkRecurseReferenced
+ * subroutine for ATAddForeignKeyConstraint; recurses on the referenced
+ * side of the constraint
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions. If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+ Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks,
+ int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+ Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+ ObjectAddress address;
+ Oid constrOid;
+ char *conname;
+ bool conislocal;
+ int coninhcount;
+ bool connoinherit;
+
+ /*
+ * Verify relkind for each referenced partition. At the top level, this
+ * is redundant with a previous check, but we need it when recursing.
+ */
+ if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+ pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("referenced relation \"%s\" is not a table",
+ RelationGetRelationName(pkrel))));
+
+ /*
+ * Caller supplies us with a constraint name; however, it may be used in
+ * this partition, so come up with a different one in that case.
+ */
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(rel),
+ fkconstraint->conname))
+ conname = ChooseConstraintName(RelationGetRelationName(rel),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(rel), NIL);
+ else
+ conname = fkconstraint->conname;
+
+ if (OidIsValid(parentConstr))
+ {
+ conislocal = false;
+ coninhcount = 1;
+ connoinherit = false;
+ }
+ else
+ {
+ conislocal = true;
+ coninhcount = 0;
+
+ /*
+ * always inherit for partitioned tables, never for legacy inheritance
+ */
+ connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+ }
/*
* Record the FK constraint in pg_constraint.
*/
- constrOid = CreateConstraintEntry(fkconstraint->conname,
+ constrOid = CreateConstraintEntry(conname,
RelationGetNamespace(rel),
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
@@ -7856,108 +7988,317 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
- numpks,
+ numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkconstraint->fk_matchtype,
NULL, /* no exclusion constraint */
NULL, /* no check constraint */
NULL,
- true, /* islocal */
- 0, /* inhcount */
- connoinherit, /* conNoInherit */
+ conislocal, /* islocal */
+ coninhcount, /* inhcount */
+ connoinherit, /* conNoInherit */
false); /* is_internal */
+
ObjectAddressSet(address, ConstraintRelationId, constrOid);
/*
- * Create the triggers that will enforce the constraint. We only want the
- * action triggers to appear for the parent partitioned relation, even
- * though the constraints also exist below.
+ * Mark the child constraint as part of the parent constraint; it must not
+ * be dropped on its own. (This constraint is deleted when the partition
+ * is detached, but a special check needs to occur that the partition
+ * contains no referenced values.)
*/
- createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
- constrOid, indexOid, !recursing);
+ if (OidIsValid(parentConstr))
+ {
+ ObjectAddress referenced;
+
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_INTERNAL);
+ }
+
+ /* make new constraint visible, in case we add more */
+ CommandCounterIncrement();
/*
- * Tell Phase 3 to check that the constraint is satisfied by existing
- * rows. We can skip this during table creation, when requested explicitly
- * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
- * recreating a constraint following a SET DATA TYPE operation that did
- * not impugn its validity.
+ * If the referenced table is a plain relation, create the action triggers
+ * that enforce the constraint.
*/
- if (!old_check_ok && !fkconstraint->skip_validation)
+ if (pkrel->rd_rel->relkind == RELKIND_RELATION)
{
- NewConstraint *newcon;
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = fkconstraint->conname;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = RelationGetRelid(pkrel);
- newcon->refindid = indexOid;
- newcon->conid = constrOid;
- newcon->qual = (Node *) fkconstraint;
-
- tab->constraints = lappend(tab->constraints, newcon);
+ createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+ fkconstraint,
+ constrOid, indexOid);
}
/*
- * When called on a partitioned table, recurse to create the constraint on
- * the partitions also.
+ * If the referenced table is partitioned, recurse on ourselves to handle
+ * each partition. We need one pg_constraint row created for each
+ * partition in addition to the pg_constraint row for the parent table.
*/
- if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
- PartitionDesc partdesc;
- Relation pg_constraint;
- List *cloned = NIL;
- ListCell *cell;
+ PartitionDesc pd = RelationGetPartitionDesc(pkrel);
- pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
- partdesc = RelationGetPartitionDesc(rel);
-
- for (i = 0; i < partdesc->nparts; i++)
+ for (int i = 0; i < pd->nparts; i++)
{
- Oid partitionId = partdesc->oids[i];
+ Relation partRel;
+ AttrNumber *map;
+ AttrNumber *mapped_pkattnum;
+ Oid partIndexId;
+
+ partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
+
+ /*
+ * Map the attribute numbers in the referenced side of the FK
+ * definition to match the partition's column layout.
+ */
+ map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+ RelationGetDescr(pkrel),
+ gettext_noop("could not convert row type"));
+ if (map)
+ {
+ mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+ for (int j = 0; j < numfks; j++)
+ mapped_pkattnum[j] = map[pkattnum[j] - 1];
+ }
+ else
+ mapped_pkattnum = pkattnum;
+
+ /* do the deed */
+ partIndexId = index_get_partition(partRel, indexOid);
+ if (!OidIsValid(partIndexId))
+ elog(ERROR, "index for %u not found in partition %s",
+ indexOid, RelationGetRelationName(partRel));
+ addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+ partIndexId, constrOid, numfks,
+ mapped_pkattnum, fkattnum,
+ pfeqoperators, ppeqoperators, ffeqoperators,
+ old_check_ok);
+
+ /* Done -- clean up (but keep the lock) */
+ table_close(partRel, NoLock);
+ if (map)
+ {
+ pfree(mapped_pkattnum);
+ pfree(map);
+ }
+ }
+ }
+
+ return address;
+}
+
+/*
+ * addFkRecurseReferencing
+ * subroutine for ATAddForeignKeyConstraint and CloneFkReferencing
+ *
+ * If the referencing relation is a plain relation, create the necessary check
+ * triggers that implement the constraint, and set up for Phase 3 constraint
+ * verification. If the referencing relation is a partitioned table, then
+ * we create a pg_constraint row for it and recurse on this routine for each
+ * partition.
+ *
+ * We assume that the referenced relation is locked against concurrent
+ * deletions. If it's a partitioned relation, every partition must be so
+ * locked.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ * lockmode is the lockmode to acquire on partitions when recursing.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+ Relation pkrel, Oid indexOid, Oid parentConstr,
+ int numfks, int16 *pkattnum, int16 *fkattnum,
+ Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+ bool old_check_ok, LOCKMODE lockmode)
+{
+ AssertArg(OidIsValid(parentConstr));
+
+ if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("foreign keys constraints are not supported on foreign tables")));
+
+ /*
+ * If the referencing relation is a plain table, add the check triggers to
+ * it and, if necessary, schedule it to be checked in Phase 3.
+ *
+ * If the relation is partitioned, drill down to do it to its partitions.
+ */
+ if (rel->rd_rel->relkind == RELKIND_RELATION)
+ {
+ createForeignKeyCheckTriggers(RelationGetRelid(rel),
+ RelationGetRelid(pkrel),
+ fkconstraint,
+ parentConstr,
+ indexOid);
+
+ /*
+ * Tell Phase 3 to check that the constraint is satisfied by existing
+ * rows. We can skip this during table creation, when requested
+ * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+ * and when we're recreating a constraint following a SET DATA TYPE
+ * operation that did not impugn its validity.
+ */
+ if (wqueue && !old_check_ok && !fkconstraint->skip_validation)
+ {
+ NewConstraint *newcon;
+ AlteredTableInfo *tab;
+
+ tab = ATGetQueueEntry(wqueue, rel);
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = get_constraint_name(parentConstr);
+ newcon->contype = CONSTR_FOREIGN;
+ newcon->refrelid = RelationGetRelid(pkrel);
+ newcon->refindid = indexOid;
+ newcon->conid = parentConstr;
+ newcon->qual = (Node *) fkconstraint;
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
+ }
+ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc pd = RelationGetPartitionDesc(rel);
+
+ /*
+ * Recurse to take appropriate action on each partition; either we
+ * find an existing constraint to reparent to ours, or we create a new
+ * one.
+ */
+ for (int i = 0; i < pd->nparts; i++)
+ {
+ Oid partitionId = pd->oids[i];
Relation partition = table_open(partitionId, lockmode);
+ List *partFKs;
+ AttrNumber *attmap;
+ AttrNumber mapped_fkattnum[INDEX_MAX_KEYS];
+ bool attached;
+ char *conname;
+ Oid constrOid;
+ ObjectAddress address,
+ referenced;
+ ListCell *cell;
CheckTableNotInUse(partition, "ALTER TABLE");
- CloneFkReferencing(pg_constraint, rel, partition,
- list_make1_oid(constrOid),
- &cloned);
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
+ RelationGetDescr(rel),
+ gettext_noop("could not convert row type"));
+ for (int j = 0; j < numfks; j++)
+ mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
+
+ /* Check whether an existing constraint can be repurposed */
+ partFKs = copyObject(RelationGetFKeyList(partition));
+ attached = false;
+ foreach(cell, partFKs)
+ {
+ ForeignKeyCacheInfo *fk;
+
+ fk = lfirst_node(ForeignKeyCacheInfo, cell);
+ if (tryAttachPartitionForeignKey(fk,
+ partitionId,
+ parentConstr,
+ numfks,
+ mapped_fkattnum,
+ pkattnum,
+ pfeqoperators))
+ {
+ attached = true;
+ break;
+ }
+ }
+ if (attached)
+ {
+ table_close(partition, NoLock);
+ continue;
+ }
+
+ /*
+ * No luck finding a good constraint to reuse; create our own.
+ */
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(partition),
+ fkconstraint->conname))
+ conname = ChooseConstraintName(RelationGetRelationName(partition),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(partition), NIL);
+ else
+ conname = fkconstraint->conname;
+ constrOid =
+ CreateConstraintEntry(conname,
+ RelationGetNamespace(partition),
+ CONSTRAINT_FOREIGN,
+ fkconstraint->deferrable,
+ fkconstraint->initdeferred,
+ fkconstraint->initially_valid,
+ parentConstr,
+ partitionId,
+ mapped_fkattnum,
+ numfks,
+ numfks,
+ InvalidOid,
+ indexOid,
+ RelationGetRelid(pkrel),
+ pkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ numfks,
+ fkconstraint->fk_upd_action,
+ fkconstraint->fk_del_action,
+ fkconstraint->fk_matchtype,
+ NULL,
+ NULL,
+ NULL,
+ false,
+ 1,
+ false,
+ false);
+
+ /*
+ * Give this constraint partition-type dependencies on the parent
+ * constraint as well as the table.
+ */
+ ObjectAddressSet(address, ConstraintRelationId, constrOid);
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+ ObjectAddressSet(referenced, RelationRelationId, partitionId);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+
+ /* Make all this visible before recursing */
+ CommandCounterIncrement();
+
+ /* call ourselves to finalize the creation and we're done */
+ addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel,
+ indexOid,
+ constrOid,
+ numfks,
+ pkattnum,
+ mapped_fkattnum,
+ pfeqoperators,
+ ppeqoperators,
+ ffeqoperators,
+ old_check_ok,
+ lockmode);
table_close(partition, NoLock);
}
- table_close(pg_constraint, RowExclusiveLock);
-
- foreach(cell, cloned)
- {
- ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
- Relation partition = table_open(cc->relid, lockmode);
- AlteredTableInfo *childtab;
- NewConstraint *newcon;
-
- /* Find or create work queue entry for this partition */
- childtab = ATGetQueueEntry(wqueue, partition);
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = cc->constraint->conname;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = cc->refrelid;
- newcon->refindid = cc->conindid;
- newcon->conid = cc->conid;
- newcon->qual = (Node *) fkconstraint;
-
- childtab->constraints = lappend(childtab->constraints, newcon);
-
- table_close(partition, lockmode);
- }
}
-
- /*
- * Close pk table, but keep lock until we've committed.
- */
- table_close(pkrel, NoLock);
-
- return address;
}
/*
@@ -7965,77 +8306,219 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
- * relationId is a partition of parentId, so we can be certain that it has the
- * same columns with the same datatypes. The columns may be in different
+ * partitionRel is a partition of parentRel, so we can be certain that it has
+ * the same columns with the same datatypes. The columns may be in different
* order, though.
*
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created, for the purposes of validating the constraint in ALTER TABLE's
- * Phase 3.
+ * wqueue must be passed to set up phase 3 constraint checking, unless the
+ * referencing-side partition is known to be empty (such as in CREATE TABLE /
+ * PARTITION OF).
*/
static void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
+ Relation partitionRel)
+{
+ /* This only works for declarative partitioning */
+ Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+ /*
+ * Clone constraints for which the parent is on the referenced side.
+ */
+ CloneFkReferenced(parentRel, partitionRel);
+
+ /*
+ * Now clone constraints where the parent is on the referencing side.
+ */
+ CloneFkReferencing(wqueue, parentRel, partitionRel);
+}
+
+/*
+ * CloneFkReferenced
+ * Subroutine for CloneForeignKeyConstraints
+ *
+ * Find all the FKs that have the parent relation on the referenced side;
+ * clone those constraints to the given partition. This is to be called
+ * when the partition is being created or attached.
+ *
+ * This recurses to partitions, if the relation being attached is partitioned.
+ * Recursion is done by calling addFkRecurseReferenced.
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel)
{
Relation pg_constraint;
- Relation parentRel;
- Relation rel;
- ScanKeyData key;
+ AttrNumber *attmap;
+ ListCell *cell;
SysScanDesc scan;
+ ScanKeyData key[2];
HeapTuple tuple;
List *clone = NIL;
- parentRel = table_open(parentId, NoLock); /* already got lock */
- /* see ATAddForeignKeyConstraint about lock level */
- rel = table_open(relationId, AccessExclusiveLock);
+ /*
+ * Search for any constraints where this partition is in the referenced
+ * side. However, we must ignore any constraint whose parent constraint
+ * is also going to be cloned, to avoid duplicates. So do it in two
+ * steps: first construct the list of constraints to clone, then go over
+ * that list cloning those whose parents are not in the list. (We must
+ * not rely on the parent being seen first, since the catalog scan could
+ * return children first.)
+ */
pg_constraint = table_open(ConstraintRelationId, RowShareLock);
-
- /* Obtain the list of constraints to clone or attach */
- ScanKeyInit(&key,
- Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
- F_OIDEQ, ObjectIdGetDatum(parentId));
- scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
- NULL, 1, &key);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+ ScanKeyInit(&key[1],
+ Anum_pg_constraint_contype, BTEqualStrategyNumber,
+ F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+ /* This is a seqscan, as we don't have a usable index ... */
+ scan = systable_beginscan(pg_constraint, InvalidOid, true,
+ NULL, 2, key);
while ((tuple = systable_getnext(scan)) != NULL)
{
- Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
- clone = lappend_oid(clone, oid);
+ /* Only try to clone the top-level constraint; skip child ones. */
+ if (constrForm->conparentid != InvalidOid)
+ continue;
+
+ clone = lappend_oid(clone, constrForm->oid);
}
systable_endscan(scan);
-
- /* Do the actual work, recursing to partitions as needed */
- CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
-
- /* We're done. Clean up */
- table_close(parentRel, NoLock);
- table_close(rel, NoLock); /* keep lock till commit */
table_close(pg_constraint, RowShareLock);
+
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+ RelationGetDescr(parentRel),
+ gettext_noop("could not convert row type"));
+ foreach(cell, clone)
+ {
+ Oid constrOid = lfirst_oid(cell);
+ Form_pg_constraint constrForm;
+ Relation fkRel;
+ Oid indexOid;
+ Oid partIndexId;
+ int numfks;
+ AttrNumber conkey[INDEX_MAX_KEYS];
+ AttrNumber mapped_confkey[INDEX_MAX_KEYS];
+ AttrNumber confkey[INDEX_MAX_KEYS];
+ Oid conpfeqop[INDEX_MAX_KEYS];
+ Oid conppeqop[INDEX_MAX_KEYS];
+ Oid conffeqop[INDEX_MAX_KEYS];
+ Constraint *fkconstraint;
+
+ tuple = SearchSysCache1(CONSTROID, constrOid);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /*
+ * Because we're only expanding the key space at the referenced side,
+ * we don't need to prevent any operation in the referencing table, so
+ * AccessShareLock suffices (assumes that dropping the constraint
+ * acquires AEL).
+ */
+ fkRel = table_open(constrForm->conrelid, AccessShareLock);
+
+ indexOid = constrForm->conindid;
+ DeconstructFkConstraintRow(tuple,
+ &numfks,
+ conkey,
+ confkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop);
+ for (int i = 0; i < numfks; i++)
+ mapped_confkey[i] = attmap[confkey[i] - 1];
+
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->conname = NameStr(constrForm->conname);
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+ fkconstraint->initially_valid = true;
+ fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+ /* set up colnames that are used to generate the constraint name */
+ for (int i = 0; i < numfks; i++)
+ {
+ Form_pg_attribute att;
+
+ att = TupleDescAttr(RelationGetDescr(fkRel),
+ conkey[i] - 1);
+ fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+ makeString(NameStr(att->attname)));
+ }
+
+ /*
+ * Add the new foreign key constraint pointing to the new partition.
+ * Because this new partition appears in the referenced side of the
+ * constraint, we don't need to set up for Phase 3 check.
+ */
+ partIndexId = index_get_partition(partitionRel, indexOid);
+ if (!OidIsValid(partIndexId))
+ elog(ERROR, "index for %u not found in partition %s",
+ indexOid, RelationGetRelationName(partitionRel));
+ addFkRecurseReferenced(NULL,
+ fkconstraint,
+ fkRel,
+ partitionRel,
+ partIndexId,
+ constrOid,
+ numfks,
+ mapped_confkey,
+ conkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ true);
+
+ table_close(fkRel, NoLock);
+ ReleaseSysCache(tuple);
+ }
}
/*
* CloneFkReferencing
- * Recursive subroutine for CloneForeignKeyConstraints, referencing side
+ * Subroutine for CloneForeignKeyConstraints
*
- * Clone the given list of FK constraints when a partition is attached on the
- * referencing side of those constraints.
+ * For each FK constraint of the parent relation in the given list, find an
+ * equivalent constraint in its partition relation that can be reparented;
+ * if one cannot be found, create a new constraint in the partition as its
+ * child.
*
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them. We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
- *
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
+ * If wqueue is given, it is used to set up phase-3 verification for each
+ * cloned constraint; if omitted, we assume that such verification is not
+ * needed (example: the partition is being created anew).
*/
static void
-CloneFkReferencing(Relation pg_constraint, Relation parentRel,
- Relation partRel, List *clone, List **cloned)
+CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
{
AttrNumber *attmap;
List *partFKs;
- List *subclone = NIL;
+ List *clone = NIL;
ListCell *cell;
+ /* obtain a list of constraints that we need to clone */
+ foreach(cell, RelationGetFKeyList(parentRel))
+ {
+ ForeignKeyCacheInfo *fk = lfirst(cell);
+
+ clone = lappend_oid(clone, fk->conoid);
+ }
+
+ /*
+ * Silently do nothing if there's nothing to do. In particular, this
+ * avoids throwing a spurious error for foreign tables.
+ */
+ if (clone == NIL)
+ return;
+
+ if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("foreign keys constraints are not supported on foreign tables")));
+
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
@@ -8050,6 +8533,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
{
Oid parentConstrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
+ Relation pkrel;
HeapTuple tuple;
int numfks;
AttrNumber conkey[INDEX_MAX_KEYS];
@@ -8059,13 +8543,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
- bool attach_it;
+ bool attached;
+ Oid indexOid;
Oid constrOid;
- ObjectAddress parentAddr,
- childAddr,
- childTableAddr;
+ ObjectAddress address,
+ referenced;
ListCell *cell;
- int i;
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
@@ -8073,161 +8556,92 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
- /* only foreign keys */
- if (constrForm->contype != CONSTRAINT_FOREIGN)
+ /* Don't clone constraints whose parents are being cloned */
+ if (list_member_oid(clone, constrForm->conparentid))
{
ReleaseSysCache(tuple);
continue;
}
- ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+ /*
+ * Need to prevent concurrent deletions. If pkrel is a partitioned
+ * relation, that means to lock all partitions.
+ */
+ pkrel = table_open(constrForm->confrelid, ShareRowExclusiveLock);
+ if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ (void) find_all_inheritors(RelationGetRelid(pkrel),
+ ShareRowExclusiveLock, NULL);
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
- for (i = 0; i < numfks; i++)
+ for (int i = 0; i < numfks; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
- * fit for the purpose. If one is, attach the parent constraint to it,
- * and don't clone anything. This way we avoid the expensive
- * verification step and don't end up with a duplicate FK. This also
- * means we don't consider this constraint when recursing to
- * partitions.
+ * fit for the purpose. If one is, attach the parent constraint to
+ * it, and don't clone anything. This way we avoid the expensive
+ * verification step and don't end up with a duplicate FK, and we
+ * don't need to recurse to partitions for this constraint.
*/
- attach_it = false;
+ attached = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
- Form_pg_constraint partConstr;
- HeapTuple partcontup;
- Relation trigrel;
- HeapTuple trigtup;
- SysScanDesc scan;
- ScanKeyData key;
- attach_it = true;
-
- /*
- * Do some quick & easy initial checks. If any of these fail, we
- * cannot use this constraint, but keep looking.
- */
- if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+ if (tryAttachPartitionForeignKey(fk,
+ RelationGetRelid(partRel),
+ parentConstrOid,
+ numfks,
+ mapped_conkey,
+ confkey,
+ conpfeqop))
{
- attach_it = false;
- continue;
+ attached = true;
+ table_close(pkrel, NoLock);
+ break;
}
- for (i = 0; i < numfks; i++)
- {
- if (fk->conkey[i] != mapped_conkey[i] ||
- fk->confkey[i] != confkey[i] ||
- fk->conpfeqop[i] != conpfeqop[i])
- {
- attach_it = false;
- break;
- }
- }
- if (!attach_it)
- continue;
-
- /*
- * Looks good so far; do some more extensive checks. Presumably
- * the check for 'convalidated' could be dropped, since we don't
- * really care about that, but let's be careful for now.
- */
- partcontup = SearchSysCache1(CONSTROID,
- ObjectIdGetDatum(fk->conoid));
- if (!partcontup)
- elog(ERROR, "cache lookup failed for constraint %u",
- fk->conoid);
- partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
- if (OidIsValid(partConstr->conparentid) ||
- !partConstr->convalidated ||
- partConstr->condeferrable != constrForm->condeferrable ||
- partConstr->condeferred != constrForm->condeferred ||
- partConstr->confupdtype != constrForm->confupdtype ||
- partConstr->confdeltype != constrForm->confdeltype ||
- partConstr->confmatchtype != constrForm->confmatchtype)
- {
- ReleaseSysCache(partcontup);
- attach_it = false;
- continue;
- }
-
- ReleaseSysCache(partcontup);
-
- /*
- * Looks good! Attach this constraint. The action triggers in
- * the new partition become redundant -- the parent table already
- * has equivalent ones, and those will be able to reach the
- * partition. Remove the ones in the partition. We identify them
- * because they have our constraint OID, as well as being on the
- * referenced rel.
- */
- trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
- ScanKeyInit(&key,
- Anum_pg_trigger_tgconstraint,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(fk->conoid));
-
- scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
- NULL, 1, &key);
- while ((trigtup = systable_getnext(scan)) != NULL)
- {
- Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
- ObjectAddress trigger;
-
- if (trgform->tgconstrrelid != fk->conrelid)
- continue;
- if (trgform->tgrelid != fk->confrelid)
- continue;
-
- /*
- * The constraint is originally set up to contain this trigger
- * as an implementation object, so there's a dependency record
- * that links the two; however, since the trigger is no longer
- * needed, we remove the dependency link in order to be able
- * to drop the trigger while keeping the constraint intact.
- */
- deleteDependencyRecordsFor(TriggerRelationId,
- trgform->oid,
- false);
- /* make dependency deletion visible to performDeletion */
- CommandCounterIncrement();
- ObjectAddressSet(trigger, TriggerRelationId,
- trgform->oid);
- performDeletion(&trigger, DROP_RESTRICT, 0);
- /* make trigger drop visible, in case the loop iterates */
- CommandCounterIncrement();
- }
-
- systable_endscan(scan);
- table_close(trigrel, RowExclusiveLock);
-
- ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
- RelationGetRelid(partRel));
- CommandCounterIncrement();
- attach_it = true;
- break;
}
-
- /*
- * If we attached to an existing constraint, there is no need to
- * create a new one. In fact, there's no need to recurse for this
- * constraint to partitions, either.
- */
- if (attach_it)
+ if (attached)
{
ReleaseSysCache(tuple);
continue;
}
+ /* No dice. Set up to create our own constraint */
+ fkconstraint = makeNode(Constraint);
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(partRel),
+ NameStr(constrForm->conname)))
+ fkconstraint->conname =
+ ChooseConstraintName(RelationGetRelationName(partRel),
+ ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+ "fkey",
+ RelationGetNamespace(partRel), NIL);
+ else
+ fkconstraint->conname = NameStr(constrForm->conname);
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+ fkconstraint->fk_matchtype = constrForm->confmatchtype;
+ for (int i = 0; i < numfks; i++)
+ {
+ Form_pg_attribute att;
+
+ att = TupleDescAttr(RelationGetDescr(partRel),
+ mapped_conkey[i] - 1);
+ fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+ makeString(NameStr(att->attname)));
+ }
+
+ indexOid = constrForm->conindid;
constrOid =
- CreateConstraintEntry(NameStr(constrForm->conname),
+ CreateConstraintEntry(fkconstraint->conname,
constrForm->connamespace,
CONSTRAINT_FOREIGN,
- constrForm->condeferrable,
- constrForm->condeferred,
+ fkconstraint->deferrable,
+ fkconstraint->initdeferred,
constrForm->convalidated,
parentConstrOid,
RelationGetRelid(partRel),
@@ -8235,92 +8649,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
numfks,
numfks,
InvalidOid, /* not a domain constraint */
- constrForm->conindid, /* same index */
+ indexOid,
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
numfks,
- constrForm->confupdtype,
- constrForm->confdeltype,
- constrForm->confmatchtype,
+ fkconstraint->fk_upd_action,
+ fkconstraint->fk_del_action,
+ fkconstraint->fk_matchtype,
NULL,
NULL,
NULL,
- false,
- 1, false, true);
- subclone = lappend_oid(subclone, constrOid);
+ false, /* islocal */
+ 1, /* inhcount */
+ false, /* conNoInherit */
+ true);
/* Set up partition dependencies for the new constraint */
- ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
- recordDependencyOn(&childAddr, &parentAddr,
- DEPENDENCY_PARTITION_PRI);
- ObjectAddressSet(childTableAddr, RelationRelationId,
+ ObjectAddressSet(address, ConstraintRelationId, constrOid);
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid);
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+ ObjectAddressSet(referenced, RelationRelationId,
RelationGetRelid(partRel));
- recordDependencyOn(&childAddr, &childTableAddr,
- DEPENDENCY_PARTITION_SEC);
-
- fkconstraint = makeNode(Constraint);
- /* for now this is all we need */
- fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
- fkconstraint->fk_upd_action = constrForm->confupdtype;
- fkconstraint->fk_del_action = constrForm->confdeltype;
- fkconstraint->deferrable = constrForm->condeferrable;
- fkconstraint->initdeferred = constrForm->condeferred;
-
- createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
- constrOid, constrForm->conindid, false);
-
- if (cloned)
- {
- ClonedConstraint *newc;
-
- /*
- * Feed back caller about the constraints we created, so that they
- * can set up constraint verification.
- */
- newc = palloc(sizeof(ClonedConstraint));
- newc->relid = RelationGetRelid(partRel);
- newc->refrelid = constrForm->confrelid;
- newc->conindid = constrForm->conindid;
- newc->conid = constrOid;
- newc->constraint = fkconstraint;
-
- *cloned = lappend(*cloned, newc);
- }
+ recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+ /* Done with the cloned constraint's tuple */
ReleaseSysCache(tuple);
- }
- pfree(attmap);
- list_free_deep(partFKs);
+ /* Make all this visible before recursing */
+ CommandCounterIncrement();
- /*
- * If the partition is partitioned, recurse to handle any constraints that
- * were cloned.
- */
- if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
- subclone != NIL)
- {
- PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
- int i;
-
- for (i = 0; i < partdesc->nparts; i++)
- {
- Relation childRel;
-
- childRel = table_open(partdesc->oids[i], AccessExclusiveLock);
- CloneFkReferencing(pg_constraint,
- partRel,
- childRel,
- subclone,
- cloned);
- table_close(childRel, NoLock); /* keep lock till commit */
- }
+ addFkRecurseReferencing(wqueue,
+ fkconstraint,
+ partRel,
+ pkrel,
+ indexOid,
+ constrOid,
+ numfks,
+ confkey,
+ mapped_conkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ false, /* no old check exists */
+ AccessExclusiveLock);
+ table_close(pkrel, NoLock);
}
}
+/*
+ * When the parent of a partition receives [the referencing side of] a foreign
+ * key, we must propagate that foreign key to the partition. However, the
+ * partition might already have an equivalent foreign key; this routine
+ * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined
+ * by the other parameters. If they are equivalent, create the link between
+ * the two constraints and return true.
+ *
+ * If the given FK does not match the one defined by rest of the params,
+ * return false.
+ */
+static bool
+tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+ Oid partRelid,
+ Oid parentConstrOid,
+ int numfks,
+ AttrNumber *mapped_conkey,
+ AttrNumber *confkey,
+ Oid *conpfeqop)
+{
+ HeapTuple parentConstrTup;
+ Form_pg_constraint parentConstr;
+ HeapTuple partcontup;
+ Form_pg_constraint partConstr;
+ Relation trigrel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ HeapTuple trigtup;
+
+ parentConstrTup = SearchSysCache1(CONSTROID,
+ ObjectIdGetDatum(parentConstrOid));
+ if (!parentConstrTup)
+ elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
+ parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
+
+ /*
+ * Do some quick & easy initial checks. If any of these fail, we cannot
+ * use this constraint.
+ */
+ if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
+ {
+ ReleaseSysCache(parentConstrTup);
+ return false;
+ }
+ for (int i = 0; i < numfks; i++)
+ {
+ if (fk->conkey[i] != mapped_conkey[i] ||
+ fk->confkey[i] != confkey[i] ||
+ fk->conpfeqop[i] != conpfeqop[i])
+ {
+ ReleaseSysCache(parentConstrTup);
+ return false;
+ }
+ }
+
+ /*
+ * Looks good so far; do some more extensive checks. Presumably the check
+ * for 'convalidated' could be dropped, since we don't really care about
+ * that, but let's be careful for now.
+ */
+ partcontup = SearchSysCache1(CONSTROID,
+ ObjectIdGetDatum(fk->conoid));
+ if (!partcontup)
+ elog(ERROR, "cache lookup failed for constraint %u",
+ fk->conoid);
+ partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+ if (OidIsValid(partConstr->conparentid) ||
+ !partConstr->convalidated ||
+ partConstr->condeferrable != parentConstr->condeferrable ||
+ partConstr->condeferred != parentConstr->condeferred ||
+ partConstr->confupdtype != parentConstr->confupdtype ||
+ partConstr->confdeltype != parentConstr->confdeltype ||
+ partConstr->confmatchtype != parentConstr->confmatchtype)
+ {
+ ReleaseSysCache(parentConstrTup);
+ ReleaseSysCache(partcontup);
+ return false;
+ }
+
+ ReleaseSysCache(partcontup);
+ ReleaseSysCache(parentConstrTup);
+
+ /*
+ * Looks good! Attach this constraint. The action triggers in the new
+ * partition become redundant -- the parent table already has equivalent
+ * ones, and those will be able to reach the partition. Remove the ones
+ * in the partition. We identify them because they have our constraint
+ * OID, as well as being on the referenced rel.
+ */
+ trigrel = table_open(TriggerRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_trigger_tgconstraint,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(fk->conoid));
+
+ scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
+ NULL, 1, &key);
+ while ((trigtup = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
+ ObjectAddress trigger;
+
+ if (trgform->tgconstrrelid != fk->conrelid)
+ continue;
+ if (trgform->tgrelid != fk->confrelid)
+ continue;
+
+ /*
+ * The constraint is originally set up to contain this trigger as an
+ * implementation object, so there's a dependency record that links
+ * the two; however, since the trigger is no longer needed, we remove
+ * the dependency link in order to be able to drop the trigger while
+ * keeping the constraint intact.
+ */
+ deleteDependencyRecordsFor(TriggerRelationId,
+ trgform->oid,
+ false);
+ /* make dependency deletion visible to performDeletion */
+ CommandCounterIncrement();
+ ObjectAddressSet(trigger, TriggerRelationId,
+ trgform->oid);
+ performDeletion(&trigger, DROP_RESTRICT, 0);
+ /* make trigger drop visible, in case the loop iterates */
+ CommandCounterIncrement();
+ }
+
+ systable_endscan(scan);
+ table_close(trigrel, RowExclusiveLock);
+
+ ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
+ CommandCounterIncrement();
+ return true;
+}
+
+
/*
* ALTER TABLE ALTER CONSTRAINT
*
@@ -8440,8 +8953,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
/*
* Update deferrability of RI_FKey_noaction_del,
* RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd
- * triggers, but not others; see createForeignKeyTriggers and
- * CreateFKCheckTrigger.
+ * triggers, but not others; see createForeignKeyActionTriggers
+ * and CreateFKCheckTrigger.
*/
if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL &&
tgform->tgfoid != F_RI_FKEY_NOACTION_UPD &&
@@ -9348,37 +9861,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
indexOid, false);
}
-/*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
- Oid constraintOid, Oid indexOid, bool create_action)
-{
- /*
- * For the referenced side, create action triggers, if requested. (If the
- * referencing side is partitioned, there is still only one trigger, which
- * runs on the referenced side and points to the top of the referencing
- * hierarchy.)
- */
- if (create_action)
- createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
- indexOid);
-
- /*
- * For the referencing side, create the check triggers. We only need
- * these on the partitions.
- */
- if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
- createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
- fkconstraint, constraintOid, indexOid);
-
- CommandCounterIncrement();
-}
-
/*
* ALTER TABLE DROP CONSTRAINT
*
@@ -14778,8 +15260,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
Oid defaultPartOid;
List *partBoundConstraint;
- List *cloned;
- ListCell *l;
/*
* We must lock the default partition if one exists, because attaching a
@@ -14960,33 +15440,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
CloneRowTriggersToPartition(rel, attachrel);
/*
- * Clone foreign key constraints, and setup for Phase 3 to verify them.
+ * Clone foreign key constraints. Callee is responsible for setting up
+ * for phase 3 constraint verification.
*/
- cloned = NIL;
- CloneForeignKeyConstraints(RelationGetRelid(rel),
- RelationGetRelid(attachrel), &cloned);
- foreach(l, cloned)
- {
- ClonedConstraint *clonedcon = lfirst(l);
- NewConstraint *newcon;
- Relation clonedrel;
- AlteredTableInfo *parttab;
-
- clonedrel = relation_open(clonedcon->relid, NoLock);
- parttab = ATGetQueueEntry(wqueue, clonedrel);
-
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = clonedcon->constraint->conname;
- newcon->contype = CONSTR_FOREIGN;
- newcon->refrelid = clonedcon->refrelid;
- newcon->refindid = clonedcon->conindid;
- newcon->conid = clonedcon->conid;
- newcon->qual = (Node *) clonedcon->constraint;
-
- parttab->constraints = lappend(parttab->constraints, newcon);
-
- relation_close(clonedrel, NoLock);
- }
+ CloneForeignKeyConstraints(wqueue, rel, attachrel);
/*
* Generate partition constraint from the partition bound specification.
@@ -15177,6 +15634,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
RelationGetRelid(attachrel));
update_relispartition(NULL, cldIdxId, true);
found = true;
+
+ CommandCounterIncrement();
break;
}
}
@@ -15368,6 +15827,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
partRel = table_openrv(name, ShareUpdateExclusiveLock);
+ /* Ensure that foreign keys still hold after this detach */
+ ATDetachCheckNoForeignKeyRefs(partRel);
+
/* All inheritance related checks are performed within the function */
RemoveInheritance(partRel, rel);
@@ -15486,6 +15948,28 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
}
list_free_deep(fks);
+ /*
+ * Any sub-constrains that are in the referenced-side of a larger
+ * constraint have to be removed. This partition is no longer part of the
+ * key space of the constraint.
+ */
+ foreach(cell, GetParentedForeignKeyRefs(partRel))
+ {
+ Oid constrOid = lfirst_oid(cell);
+ ObjectAddress constraint;
+
+ ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+ deleteDependencyRecordsForClass(ConstraintRelationId,
+ constrOid,
+ ConstraintRelationId,
+ DEPENDENCY_INTERNAL);
+ CommandCounterIncrement();
+
+ ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
+ performDeletion(&constraint, DROP_RESTRICT, 0);
+ }
+ CommandCounterIncrement();
+
/*
* Invalidate the parent's relcache so that the partition is no longer
* included in its partition descriptor.
@@ -15866,3 +16350,107 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
if (opened)
table_close(classRel, RowExclusiveLock);
}
+
+/*
+ * Return an OID list of constraints that reference the given relation
+ * that are marked as having a parent constraints.
+ */
+static List *
+GetParentedForeignKeyRefs(Relation partition)
+{
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData key[2];
+ List *constraints = NIL;
+
+ /*
+ * If no indexes, or no columns are referenceable by FKs, we can avoid the
+ * scan.
+ */
+ if (RelationGetIndexList(partition) == NIL ||
+ bms_is_empty(RelationGetIndexAttrBitmap(partition,
+ INDEX_ATTR_BITMAP_KEY)))
+ return NIL;
+
+ /* Search for constraints referencing this table */
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+ ScanKeyInit(&key[1],
+ Anum_pg_constraint_contype, BTEqualStrategyNumber,
+ F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+ /* XXX This is a seqscan, as we don't have a usable index */
+ scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key);
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /*
+ * We only need to process constraints that are part of larger ones.
+ */
+ if (!OidIsValid(constrForm->conparentid))
+ continue;
+
+ constraints = lappend_oid(constraints, constrForm->oid);
+ }
+
+ systable_endscan(scan);
+ table_close(pg_constraint, AccessShareLock);
+
+ return constraints;
+}
+
+/*
+ * During DETACH PARTITION, verify that any foreign keys pointing to the
+ * partitioned table would not become invalid. An error is raised if any
+ * referenced values exist.
+ */
+static void
+ATDetachCheckNoForeignKeyRefs(Relation partition)
+{
+ List *constraints;
+ ListCell *cell;
+
+ constraints = GetParentedForeignKeyRefs(partition);
+
+ foreach(cell, constraints)
+ {
+ Oid constrOid = lfirst_oid(cell);
+ HeapTuple tuple;
+ Form_pg_constraint constrForm;
+ Relation rel;
+ Trigger trig;
+
+ tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ Assert(OidIsValid(constrForm->conparentid));
+ Assert(constrForm->confrelid == RelationGetRelid(partition));
+
+ /* prevent data changes into the referencing table until commit */
+ rel = table_open(constrForm->conrelid, ShareLock);
+
+ MemSet(&trig, 0, sizeof(trig));
+ trig.tgoid = InvalidOid;
+ trig.tgname = NameStr(constrForm->conname);
+ trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+ trig.tgisinternal = true;
+ trig.tgconstrrelid = RelationGetRelid(partition);
+ trig.tgconstrindid = constrForm->conindid;
+ trig.tgconstraint = constrForm->oid;
+ trig.tgdeferrable = false;
+ trig.tginitdeferred = false;
+ /* we needn't fill in remaining fields */
+
+ RI_PartitionRemove_Check(&trig, rel, partition);
+
+ ReleaseSysCache(tuple);
+
+ table_close(rel, NoLock);
+ }
+}
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 72f8a9d69cf..095334b3363 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -50,6 +50,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
+#include "utils/ruleutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
Datum *vals, char *nulls);
static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
- TupleTableSlot *violator, TupleDesc tupdesc,
- int queryno) pg_attribute_noreturn();
+ TupleTableSlot *violatorslot, TupleDesc tupdesc,
+ int queryno, bool partgone) pg_attribute_noreturn();
/*
@@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
+ const char *pk_only;
/* ----------
* The query string built is
- * SELECT 1 FROM ONLY x WHERE pkatt1 = $1 [AND ...]
+ * SELECT 1 FROM [ONLY] x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding FK attributes.
* ----------
*/
initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
char attname[MAX_QUOTED_NAME_LEN];
char paramname[16];
const char *querysep;
+ const char *pk_only;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
- * SELECT 1 FROM ONLY x WHERE pkatt1 = $1 [AND ...]
+ * SELECT 1 FROM [ONLY] x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* PK attributes themselves.
* ----------
*/
initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
RangeTblEntry *fkrte;
const char *sep;
const char *fk_only;
+ const char *pk_only;
int save_nestlevel;
char workmembuf[32];
int spi_result;
@@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
/*----------
* The query string built is:
* SELECT fk.keycols FROM [ONLY] relname fk
- * LEFT OUTER JOIN ONLY pkrelname pk
+ * LEFT OUTER JOIN [ONLY] pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH SIMPLE:
@@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
quoteRelationName(fkrelname, fk_rel);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
appendStringInfo(&querybuf,
- " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
- fk_only, fkrelname, pkrelname);
+ " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+ fk_only, fkrelname, pk_only, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
@@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
ri_ReportViolation(&fake_riinfo,
pk_rel, fk_rel,
slot, tupdesc,
- RI_PLAN_CHECK_LOOKUPPK);
+ RI_PLAN_CHECK_LOOKUPPK, false);
ExecDropSingleTupleTableSlot(slot);
}
@@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
return true;
}
+/*
+ * RI_PartitionRemove_Check -
+ *
+ * Verify no referencing values exist, when a partition is detached on
+ * the referenced side of a foreign key constraint.
+ */
+void
+RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+ const RI_ConstraintInfo *riinfo;
+ StringInfoData querybuf;
+ char *constraintDef;
+ char pkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char fkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char pkattname[MAX_QUOTED_NAME_LEN + 3];
+ char fkattname[MAX_QUOTED_NAME_LEN + 3];
+ const char *sep;
+ const char *fk_only;
+ int save_nestlevel;
+ char workmembuf[32];
+ int spi_result;
+ SPIPlanPtr qplan;
+ int i;
+
+ riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+ /*
+ * We don't check permissions before displaying the error message, on the
+ * assumption that the user detaching the partition must have enough
+ * privileges to examine the table contents anyhow.
+ */
+
+ /*----------
+ * The query string built is:
+ * SELECT fk.keycols FROM [ONLY] relname fk
+ * JOIN pkrelname pk
+ * ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+ * WHERE () AND
+ * For MATCH SIMPLE:
+ * (fk.keycol1 IS NOT NULL [AND ...])
+ * For MATCH FULL:
+ * (fk.keycol1 IS NOT NULL [OR ...])
+ *
+ * We attach COLLATE clauses to the operators when comparing columns
+ * that have different collations.
+ *----------
+ */
+ initStringInfo(&querybuf);
+ appendStringInfoString(&querybuf, "SELECT ");
+ sep = "";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ quoteOneName(fkattname,
+ RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+ sep = ", ";
+ }
+
+ quoteRelationName(pkrelname, pk_rel);
+ quoteRelationName(fkrelname, fk_rel);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
+ appendStringInfo(&querybuf,
+ " FROM %s%s fk JOIN %s pk ON",
+ fk_only, fkrelname, pkrelname);
+ strcpy(pkattname, "pk.");
+ strcpy(fkattname, "fk.");
+ sep = "(";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+ Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+ Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+ Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+ quoteOneName(pkattname + 3,
+ RIAttName(pk_rel, riinfo->pk_attnums[i]));
+ quoteOneName(fkattname + 3,
+ RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ ri_GenerateQual(&querybuf, sep,
+ pkattname, pk_type,
+ riinfo->pf_eq_oprs[i],
+ fkattname, fk_type);
+ if (pk_coll != fk_coll)
+ ri_GenerateQualCollation(&querybuf, pk_coll);
+ sep = "AND";
+ }
+
+ /*
+ * Start the WHERE clause with the partition constraint (except if this is
+ * the default partition and there's no other partition, because the
+ * partition constraint is the empty string in that case.)
+ */
+ constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
+ if (constraintDef && constraintDef[0] != '\0')
+ appendStringInfo(&querybuf, ") WHERE %s AND (",
+ constraintDef);
+ else
+ appendStringInfo(&querybuf, ") WHERE (");
+
+ sep = "";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ appendStringInfo(&querybuf,
+ "%sfk.%s IS NOT NULL",
+ sep, fkattname);
+ switch (riinfo->confmatchtype)
+ {
+ case FKCONSTR_MATCH_SIMPLE:
+ sep = " AND ";
+ break;
+ case FKCONSTR_MATCH_FULL:
+ sep = " OR ";
+ break;
+ }
+ }
+ appendStringInfoChar(&querybuf, ')');
+
+ /*
+ * Temporarily increase work_mem so that the check query can be executed
+ * more efficiently. It seems okay to do this because the query is simple
+ * enough to not use a multiple of work_mem, and one typically would not
+ * have many large foreign-key validations happening concurrently. So
+ * this seems to meet the criteria for being considered a "maintenance"
+ * operation, and accordingly we use maintenance_work_mem.
+ *
+ * We use the equivalent of a function SET option to allow the setting to
+ * persist for exactly the duration of the check query. guc.c also takes
+ * care of undoing the setting on error.
+ */
+ save_nestlevel = NewGUCNestLevel();
+
+ snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
+ (void) set_config_option("work_mem", workmembuf,
+ PGC_USERSET, PGC_S_SESSION,
+ GUC_ACTION_SAVE, true, 0, false);
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ /*
+ * Generate the plan. We don't need to cache it, and there are no
+ * arguments to the plan.
+ */
+ qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+ if (qplan == NULL)
+ elog(ERROR, "SPI_prepare returned %s for %s",
+ SPI_result_code_string(SPI_result), querybuf.data);
+
+ /*
+ * Run the plan. For safety we force a current snapshot to be used. (In
+ * transaction-snapshot mode, this arguably violates transaction isolation
+ * rules, but we really haven't got much choice.) We don't need to
+ * register the snapshot, because SPI_execute_snapshot will see to it. We
+ * need at most one tuple returned, so pass limit = 1.
+ */
+ spi_result = SPI_execute_snapshot(qplan,
+ NULL, NULL,
+ GetLatestSnapshot(),
+ InvalidSnapshot,
+ true, false, 1);
+
+ /* Check result */
+ if (spi_result != SPI_OK_SELECT)
+ elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+ /* Did we find a tuple that would violate the constraint? */
+ if (SPI_processed > 0)
+ {
+ TupleTableSlot *slot;
+ HeapTuple tuple = SPI_tuptable->vals[0];
+ TupleDesc tupdesc = SPI_tuptable->tupdesc;
+ RI_ConstraintInfo fake_riinfo;
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ heap_deform_tuple(tuple, tupdesc,
+ slot->tts_values, slot->tts_isnull);
+ ExecStoreVirtualTuple(slot);
+
+ /*
+ * The columns to look at in the result tuple are 1..N, not whatever
+ * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation
+ * will behave properly.
+ *
+ * In addition to this, we have to pass the correct tupdesc to
+ * ri_ReportViolation, overriding its normal habit of using the pk_rel
+ * or fk_rel's tupdesc.
+ */
+ memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+ for (i = 0; i < fake_riinfo.nkeys; i++)
+ fake_riinfo.pk_attnums[i] = i + 1;
+
+ ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+ slot, tupdesc, 0, true);
+ }
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ /*
+ * Restore work_mem.
+ */
+ AtEOXact_GUC(true, save_nestlevel);
+}
+
/* ----------
* Local functions below
@@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
pk_rel, fk_rel,
newslot ? newslot : oldslot,
NULL,
- qkey->constr_queryno);
+ qkey->constr_queryno, false);
return SPI_processed != 0;
}
@@ -2119,7 +2339,7 @@ static void
ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- int queryno)
+ int queryno, bool partgone)
{
StringInfoData key_names;
StringInfoData key_values;
@@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
*
* Check table-level permissions next and, failing that, column-level
* privileges.
+ *
+ * When a partition at the referenced side is being detached/dropped, we
+ * needn't check, since the user must be the table owner anyway.
*/
-
- if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+ if (partgone)
+ has_perm = true;
+ else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
{
aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
if (aclresult != ACLCHECK_OK)
@@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
}
}
- if (onfk)
+ if (partgone)
+ ereport(ERROR,
+ (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+ errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+ RelationGetRelationName(pk_rel),
+ NameStr(riinfo->conname)),
+ errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+ key_names.data, key_values.data,
+ RelationGetRelationName(fk_rel))));
+ else if (onfk)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 88dc09cae68..7b142e3b188 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1836,6 +1836,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(string_to_text(consrc));
}
+/*
+ * pg_get_partconstrdef_string
+ *
+ * Returns the partition constraint as a C-string for the input relation, with
+ * the given alias. No pretty-printing.
+ */
+char *
+pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
+{
+ Expr *constr_expr;
+ List *context;
+
+ constr_expr = get_partition_qual_relid(partitionId);
+ context = deparse_context_for(aliasname, partitionId);
+
+ return deparse_expression((Node *) constr_expr, context, true, false);
+}
+
/*
* pg_get_constraintdef
*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 036810303a6..f7f7285acca 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2452,9 +2452,12 @@ describeOneTableDetails(const char *schemaname,
" pg_catalog.pg_get_constraintdef(r.oid, true) as condef,\n"
" conrelid::pg_catalog.regclass AS ontable\n"
"FROM pg_catalog.pg_constraint r\n"
- "WHERE r.conrelid = '%s' AND r.contype = 'f'\n"
- "ORDER BY conname;",
+ "WHERE r.conrelid = '%s' AND r.contype = 'f'\n",
oid);
+
+ if (pset.sversion >= 120000)
+ appendPQExpBuffer(&buf, " AND conparentid = 0\n");
+ appendPQExpBuffer(&buf, "ORDER BY conname");
}
result = PSQLexec(buf.data);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index ec3bb90b01b..96927b900d6 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -76,10 +76,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
extern void check_of_type(HeapTuple typetuple);
-extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
- Constraint *fkconstraint, Oid constraintOid,
- Oid indexOid, bool create_action);
-
extern void register_on_commit_action(Oid relid, OnCommitAction action);
extern void remove_on_commit_action(Oid relid);
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index 846679ecc12..6d460ffd742 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -263,6 +263,8 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
TupleTableSlot *old_slot, TupleTableSlot *new_slot);
extern bool RI_Initial_Check(Trigger *trigger,
Relation fk_rel, Relation pk_rel);
+extern void RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel,
+ Relation pk_rel);
/* result values for RI_FKey_trigger_type: */
#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 3ebc01e7147..7c49e9d0a83 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
extern char *pg_get_constraintdef_command(Oid constraintId);
extern char *deparse_expression(Node *expr, List *dpcontext,
diff --git a/src/test/isolation/expected/fk-partitioned-1.out b/src/test/isolation/expected/fk-partitioned-1.out
new file mode 100644
index 00000000000..aea2b6d56b4
--- /dev/null
+++ b/src/test/isolation/expected/fk-partitioned-1.out
@@ -0,0 +1,133 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1b s1d s1c s2b s2a s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2b s1c s2a s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2b: begin;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2b s2a s1c s2c
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s1c s2a s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s2a s1c s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2b s2a s1d s2c s1c
+step s1b: begin;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1d: delete from ppk1 where a = 1;
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s1b s2b s2a s2c s1d s1c
+step s1b: begin;
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s1b s1d s1c s2a s2c
+step s2b: begin;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s1c: commit;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s2b s1b s1d s2a s1c s2c
+step s2b: begin;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1c: commit;
+step s2a: <... completed>
+error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s2b s1b s2a s1d s2c s1c
+step s2b: begin;
+step s1b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1d: delete from ppk1 where a = 1;
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s1b s2a s2c s1d s1c
+step s2b: begin;
+step s1b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s1b s1d s2c s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s1b s2c s1d s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s1b: begin;
+step s2c: commit;
+step s1d: delete from ppk1 where a = 1;
+ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s2b s2a s2c s1b s1d s1c
+step s2b: begin;
+step s2a: alter table pfk attach partition pfk1 for values in (1);
+step s2c: commit;
+step s1b: begin;
+step s1d: delete from ppk1 where a = 1;
+ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
diff --git a/src/test/isolation/expected/fk-partitioned-2.out b/src/test/isolation/expected/fk-partitioned-2.out
new file mode 100644
index 00000000000..722b615c6ea
--- /dev/null
+++ b/src/test/isolation/expected/fk-partitioned-2.out
@@ -0,0 +1,70 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1b s1d s2b s2i s1c s2c
+step s1b: begin;
+step s1d: delete from ppk where a = 1;
+step s2b: begin;
+step s2i: insert into pfk values (1);
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s1d s2bs s2i s1c s2c
+step s1b: begin;
+step s1d: delete from ppk where a = 1;
+step s2bs: begin isolation level serializable; select 1;
+?column?
+
+1
+step s2i: insert into pfk values (1);
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR: could not serialize access due to concurrent update
+step s2c: commit;
+
+starting permutation: s1b s2b s1d s2i s1c s2c
+step s1b: begin;
+step s2b: begin;
+step s1d: delete from ppk where a = 1;
+step s2i: insert into pfk values (1);
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
+step s2c: commit;
+
+starting permutation: s1b s2bs s1d s2i s1c s2c
+step s1b: begin;
+step s2bs: begin isolation level serializable; select 1;
+?column?
+
+1
+step s1d: delete from ppk where a = 1;
+step s2i: insert into pfk values (1);
+step s1c: commit;
+step s2i: <... completed>
+error in steps s1c s2i: ERROR: could not serialize access due to concurrent update
+step s2c: commit;
+
+starting permutation: s1b s2b s2i s1d s2c s1c
+step s1b: begin;
+step s2b: begin;
+step s2i: insert into pfk values (1);
+step s1d: delete from ppk where a = 1;
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
+
+starting permutation: s1b s2bs s2i s1d s2c s1c
+step s1b: begin;
+step s2bs: begin isolation level serializable; select 1;
+?column?
+
+1
+step s2i: insert into pfk values (1);
+step s1d: delete from ppk where a = 1;
+step s2c: commit;
+step s1d: <... completed>
+error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
+step s1c: commit;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index f1ae50e5ba8..11cd24fc981 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -24,6 +24,8 @@ test: deadlock-soft-2
test: fk-contention
test: fk-deadlock
test: fk-deadlock2
+test: fk-partitioned-1
+test: fk-partitioned-2
test: eval-plan-qual
test: lock-update-delete
test: lock-update-traversal
diff --git a/src/test/isolation/specs/fk-partitioned-1.spec b/src/test/isolation/specs/fk-partitioned-1.spec
new file mode 100644
index 00000000000..4c760e89b34
--- /dev/null
+++ b/src/test/isolation/specs/fk-partitioned-1.spec
@@ -0,0 +1,45 @@
+# Verify that cloning a foreign key constraint to a partition ensures
+# that referenced values exist, even if they're being concurrently
+# deleted.
+setup {
+drop table if exists ppk, pfk, pfk1;
+ create table ppk (a int primary key) partition by list (a);
+ create table ppk1 partition of ppk for values in (1);
+ insert into ppk values (1);
+ create table pfk (a int references ppk) partition by list (a);
+ create table pfk1 (a int not null);
+ insert into pfk1 values (1);
+}
+
+session "s1"
+step "s1b" { begin; }
+step "s1d" { delete from ppk1 where a = 1; }
+step "s1c" { commit; }
+
+session "s2"
+step "s2b" { begin; }
+step "s2a" { alter table pfk attach partition pfk1 for values in (1); }
+step "s2c" { commit; }
+
+teardown { drop table ppk, pfk, pfk1; }
+
+permutation "s1b" "s1d" "s1c" "s2b" "s2a" "s2c"
+permutation "s1b" "s1d" "s2b" "s1c" "s2a" "s2c"
+permutation "s1b" "s1d" "s2b" "s2a" "s1c" "s2c"
+#permutation "s1b" "s1d" "s2b" "s2a" "s2c" "s1c"
+permutation "s1b" "s2b" "s1d" "s1c" "s2a" "s2c"
+permutation "s1b" "s2b" "s1d" "s2a" "s1c" "s2c"
+#permutation "s1b" "s2b" "s1d" "s2a" "s2c" "s1c"
+#permutation "s1b" "s2b" "s2a" "s1d" "s1c" "s2c"
+permutation "s1b" "s2b" "s2a" "s1d" "s2c" "s1c"
+permutation "s1b" "s2b" "s2a" "s2c" "s1d" "s1c"
+permutation "s2b" "s1b" "s1d" "s1c" "s2a" "s2c"
+permutation "s2b" "s1b" "s1d" "s2a" "s1c" "s2c"
+#permutation "s2b" "s1b" "s1d" "s2a" "s2c" "s1c"
+#permutation "s2b" "s1b" "s2a" "s1d" "s1c" "s2c"
+permutation "s2b" "s1b" "s2a" "s1d" "s2c" "s1c"
+permutation "s2b" "s1b" "s2a" "s2c" "s1d" "s1c"
+#permutation "s2b" "s2a" "s1b" "s1d" "s1c" "s2c"
+permutation "s2b" "s2a" "s1b" "s1d" "s2c" "s1c"
+permutation "s2b" "s2a" "s1b" "s2c" "s1d" "s1c"
+permutation "s2b" "s2a" "s2c" "s1b" "s1d" "s1c"
diff --git a/src/test/isolation/specs/fk-partitioned-2.spec b/src/test/isolation/specs/fk-partitioned-2.spec
new file mode 100644
index 00000000000..f1a76e801cf
--- /dev/null
+++ b/src/test/isolation/specs/fk-partitioned-2.spec
@@ -0,0 +1,29 @@
+# Make sure that FKs referencing partitioned tables actually work.
+setup {
+ drop table if exists ppk, pfk, pfk1;
+ create table ppk (a int primary key) partition by list (a);
+ create table ppk1 partition of ppk for values in (1);
+ insert into ppk values (1);
+ create table pfk (a int references ppk) partition by list (a);
+ create table pfk1 partition of pfk for values in (1);
+}
+
+session "s1"
+step "s1b" { begin; }
+step "s1d" { delete from ppk where a = 1; }
+step "s1c" { commit; }
+
+session "s2"
+step "s2b" { begin; }
+step "s2bs" { begin isolation level serializable; select 1; }
+step "s2i" { insert into pfk values (1); }
+step "s2c" { commit; }
+
+teardown { drop table ppk, pfk, pfk1; }
+
+permutation "s1b" "s1d" "s2b" "s2i" "s1c" "s2c"
+permutation "s1b" "s1d" "s2bs" "s2i" "s1c" "s2c"
+permutation "s1b" "s2b" "s1d" "s2i" "s1c" "s2c"
+permutation "s1b" "s2bs" "s1d" "s2i" "s1c" "s2c"
+permutation "s1b" "s2b" "s2i" "s1d" "s2c" "s1c"
+permutation "s1b" "s2bs" "s2i" "s1d" "s2c" "s1c"
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 4f7acb9b1ef..620eb4392be 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1532,19 +1532,6 @@ drop table pktable2, fktable2;
--
-- Foreign keys and partitioned tables
--
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
- PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
- FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-ERROR: cannot reference partitioned table "fk_partitioned_pk"
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
- REFERENCES fk_partitioned_pk;
-ERROR: cannot reference partitioned table "fk_partitioned_pk"
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
@@ -1686,7 +1673,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x);
CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT;
INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);
ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; -- fails
-ERROR: insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
+ERROR: insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
DETAIL: MATCH FULL does not allow mixing of null and nonnull key values.
TRUNCATE fk_partitioned_fk_full;
ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;
@@ -1902,7 +1889,7 @@ CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES F
INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
-ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
+ERROR: insert or update on table "fk_partitioned_fk_2_1" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
DETAIL: Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
@@ -2037,3 +2024,320 @@ drop cascades to table fkpart1.fk_part
drop cascades to table fkpart1.fk_part_1
drop cascades to table fkpart0.pkey
drop cascades to table fkpart0.fk_part
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions added (created and
+-- attached) after creating the foreign key.
+CREATE SCHEMA fkpart3;
+SET search_path TO fkpart3;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE pk2 (b int, a int);
+ALTER TABLE pk2 DROP COLUMN b;
+ALTER TABLE pk2 ALTER a SET NOT NULL;
+ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE fk2 (b int, a int) ;
+ALTER TABLE fk2 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
+CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
+CREATE TABLE pk4 (LIKE pk);
+ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
+CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
+ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
+CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
+CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
+ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
+CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
+-- these should fail: referenced value not present
+INSERT into fk VALUES (1);
+ERROR: insert or update on table "fk1" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(1) is not present in table "pk".
+INSERT into fk VALUES (1000);
+ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(1000) is not present in table "pk".
+INSERT into fk VALUES (2000);
+ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(2000) is not present in table "pk".
+INSERT into fk VALUES (3000);
+ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(3000) is not present in table "pk".
+INSERT into fk VALUES (4000);
+ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(4000) is not present in table "pk".
+INSERT into fk VALUES (4500);
+ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(4500) is not present in table "pk".
+-- insert into the referenced table, now they should work
+INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+-- should fail: referencing value present
+DELETE FROM pk WHERE a = 1;
+ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL: Key (a)=(1) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 1000;
+ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL: Key (a)=(1000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 2000;
+ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL: Key (a)=(2000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 3000;
+ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL: Key (a)=(3000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 4000;
+ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL: Key (a)=(4000) is still referenced from table "fk".
+DELETE FROM pk WHERE a = 4500;
+ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL: Key (a)=(4500) is still referenced from table "fk".
+UPDATE pk SET a = 2 WHERE a = 1;
+ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL: Key (a)=(1) is still referenced from table "fk".
+UPDATE pk SET a = 1002 WHERE a = 1000;
+ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL: Key (a)=(1000) is still referenced from table "fk".
+UPDATE pk SET a = 2002 WHERE a = 2000;
+ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL: Key (a)=(2000) is still referenced from table "fk".
+UPDATE pk SET a = 3002 WHERE a = 3000;
+ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL: Key (a)=(3000) is still referenced from table "fk".
+UPDATE pk SET a = 4002 WHERE a = 4000;
+ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL: Key (a)=(4000) is still referenced from table "fk".
+UPDATE pk SET a = 4502 WHERE a = 4500;
+ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL: Key (a)=(4500) is still referenced from table "fk".
+-- now they should work
+DELETE FROM fk;
+UPDATE pk SET a = 2 WHERE a = 1;
+DELETE FROM pk WHERE a = 2;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+DELETE FROM pk WHERE a = 1002;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+DELETE FROM pk WHERE a = 2002;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+DELETE FROM pk WHERE a = 3002;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+DELETE FROM pk WHERE a = 4002;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+DELETE FROM pk WHERE a = 4502;
+CREATE SCHEMA fkpart4;
+SET search_path TO fkpart4;
+-- dropping/detaching PARTITIONs is prevented if that would break
+-- a foreign key's existing data
+CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
+CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
+ PARTITION BY RANGE (a);
+CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
+CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
+INSERT into droppk VALUES (1), (1000), (1500), (2000);
+CREATE TABLE dropfk (a int REFERENCES droppk);
+INSERT into dropfk VALUES (1), (1000), (1500), (2000);
+-- these should all fail
+ALTER TABLE droppk DETACH PARTITION droppk_d;
+ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL: Key (a)=(2000) still referenced from table "dropfk".
+ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
+ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL: Key (a)=(1500) still referenced from table "dropfk".
+ALTER TABLE droppk DETACH PARTITION droppk1;
+ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL: Key (a)=(1) still referenced from table "dropfk".
+ALTER TABLE droppk DETACH PARTITION droppk2;
+ERROR: removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
+DETAIL: Key (a)=(1000) still referenced from table "dropfk".
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL: Key (a)=(1000) still referenced from table "dropfk".
+-- dropping partitions is disallowed
+DROP TABLE droppk_d;
+ERROR: cannot drop table droppk_d because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk_d
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2_d;
+ERROR: cannot drop table droppk2_d because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk1;
+ERROR: cannot drop table droppk1 because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk1
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2;
+ERROR: cannot drop table droppk2 because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk21;
+ERROR: cannot drop table droppk21 because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk21
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DELETE FROM dropfk;
+-- dropping partitions is disallowed, even when no referencing values
+DROP TABLE droppk_d;
+ERROR: cannot drop table droppk_d because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk_d
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk2_d;
+ERROR: cannot drop table droppk2_d because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+DROP TABLE droppk1;
+ERROR: cannot drop table droppk1 because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk1
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+-- but DETACH is allowed, and DROP afterwards works
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+DROP TABLE droppk2;
+ERROR: cannot drop table droppk2 because other objects depend on it
+DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2
+HINT: Use DROP ... CASCADE to drop the dependent objects too.
+-- Verify that initial constraint creation and cloning behave correctly
+CREATE SCHEMA fkpart5;
+SET search_path TO fkpart5;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
+CREATE TABLE fk (a int) PARTITION BY LIST (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
+CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
+CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
+CREATE TABLE pk32 (b int, a int NOT NULL);
+ALTER TABLE pk32 DROP COLUMN b;
+ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
+ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
+CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
+CREATE TABLE fk3 (b int, a int);
+ALTER TABLE fk3 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
+SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
+ CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
+FROM pg_catalog.pg_constraint
+WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
+ORDER BY conrelid::regclass::text, conname;
+ pg_describe_object | confrelid | case
+------------------------------------+-----------+-----------------------------------
+ constraint fk_a_fkey on table fk | pk | TOP
+ constraint fk_a_fkey1 on table fk | pk1 | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey2 on table fk | pk11 | constraint fk_a_fkey1 on table fk
+ constraint fk_a_fkey3 on table fk | pk2 | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey4 on table fk | pk3 | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey5 on table fk | pk31 | constraint fk_a_fkey4 on table fk
+ constraint fk_a_fkey6 on table fk | pk32 | constraint fk_a_fkey4 on table fk
+ constraint fk_a_fkey on table fk1 | pk | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey on table fk11 | pk | constraint fk_a_fkey on table fk1
+ constraint fk_a_fkey on table fk2 | pk | constraint fk_a_fkey on table fk
+ constraint fk_a_fkey on table fk3 | pk | constraint fk_a_fkey on table fk
+(11 rows)
+
+CREATE TABLE fk4 (LIKE fk);
+INSERT INTO fk4 VALUES (50);
+ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
+ERROR: insert or update on table "fk4" violates foreign key constraint "fk_a_fkey"
+DETAIL: Key (a)=(50) is not present in table "pk".
+-- Verify ON UPDATE/DELETE behavior
+CREATE SCHEMA fkpart6;
+SET search_path TO fkpart6;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
+CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO pk VALUES (1);
+INSERT INTO fk VALUES (1);
+UPDATE pk SET a = 20;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a
+----------+----
+ fk12 | 20
+(1 row)
+
+DELETE FROM pk WHERE a = 20;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a
+----------+---
+(0 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (50);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (50);
+UPDATE pk SET a = 21 WHERE a = 20;
+DELETE FROM pk WHERE a = 50;
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a
+----------+---
+ fk_d |
+ fk_d |
+(2 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30), (50);
+CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (1, 20), (2, 30);
+DELETE FROM pk WHERE a = 20 RETURNING *;
+ a
+----
+ 20
+(1 row)
+
+UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
+ a
+----
+ 90
+(1 row)
+
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | id | a
+----------+----+----
+ fk12 | 1 | 50
+ fk12 | 2 | 50
+(2 rows)
+
+DROP TABLE fk;
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30);
+CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (30);
+DELETE FROM pk WHERE a = 20;
+ERROR: update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL: Key (a)=(20) is still referenced from table "fk".
+UPDATE pk SET a = 90 WHERE a = 30;
+ERROR: update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL: Key (a)=(30) is still referenced from table "fk".
+SELECT tableoid::regclass, * FROM fk;
+ tableoid | a
+----------+----
+ fk12 | 20
+ fk12 | 30
+(2 rows)
+
+DROP TABLE fk;
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index a5c7e147a7f..1a8685099e7 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1145,18 +1145,6 @@ drop table pktable2, fktable2;
-- Foreign keys and partitioned tables
--
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
- PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
- FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
- REFERENCES fk_partitioned_pk;
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
@@ -1443,3 +1431,204 @@ alter table fkpart2.fk_part_1 drop constraint fkey; -- ok
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist
drop schema fkpart0, fkpart1, fkpart2 cascade;
+
+-- Test a partitioned table as referenced table.
+
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions added (created and
+-- attached) after creating the foreign key.
+CREATE SCHEMA fkpart3;
+SET search_path TO fkpart3;
+
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE pk2 (b int, a int);
+ALTER TABLE pk2 DROP COLUMN b;
+ALTER TABLE pk2 ALTER a SET NOT NULL;
+ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
+
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE fk2 (b int, a int) ;
+ALTER TABLE fk2 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
+
+CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
+CREATE TABLE pk4 (LIKE pk);
+ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
+
+CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
+ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
+CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
+CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
+ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
+
+CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
+
+-- these should fail: referenced value not present
+INSERT into fk VALUES (1);
+INSERT into fk VALUES (1000);
+INSERT into fk VALUES (2000);
+INSERT into fk VALUES (3000);
+INSERT into fk VALUES (4000);
+INSERT into fk VALUES (4500);
+-- insert into the referenced table, now they should work
+INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
+
+-- should fail: referencing value present
+DELETE FROM pk WHERE a = 1;
+DELETE FROM pk WHERE a = 1000;
+DELETE FROM pk WHERE a = 2000;
+DELETE FROM pk WHERE a = 3000;
+DELETE FROM pk WHERE a = 4000;
+DELETE FROM pk WHERE a = 4500;
+UPDATE pk SET a = 2 WHERE a = 1;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+-- now they should work
+DELETE FROM fk;
+UPDATE pk SET a = 2 WHERE a = 1;
+DELETE FROM pk WHERE a = 2;
+UPDATE pk SET a = 1002 WHERE a = 1000;
+DELETE FROM pk WHERE a = 1002;
+UPDATE pk SET a = 2002 WHERE a = 2000;
+DELETE FROM pk WHERE a = 2002;
+UPDATE pk SET a = 3002 WHERE a = 3000;
+DELETE FROM pk WHERE a = 3002;
+UPDATE pk SET a = 4002 WHERE a = 4000;
+DELETE FROM pk WHERE a = 4002;
+UPDATE pk SET a = 4502 WHERE a = 4500;
+DELETE FROM pk WHERE a = 4502;
+
+CREATE SCHEMA fkpart4;
+SET search_path TO fkpart4;
+-- dropping/detaching PARTITIONs is prevented if that would break
+-- a foreign key's existing data
+CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
+CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
+CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
+ PARTITION BY RANGE (a);
+CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
+CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
+INSERT into droppk VALUES (1), (1000), (1500), (2000);
+CREATE TABLE dropfk (a int REFERENCES droppk);
+INSERT into dropfk VALUES (1), (1000), (1500), (2000);
+-- these should all fail
+ALTER TABLE droppk DETACH PARTITION droppk_d;
+ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
+ALTER TABLE droppk DETACH PARTITION droppk1;
+ALTER TABLE droppk DETACH PARTITION droppk2;
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+-- dropping partitions is disallowed
+DROP TABLE droppk_d;
+DROP TABLE droppk2_d;
+DROP TABLE droppk1;
+DROP TABLE droppk2;
+DROP TABLE droppk21;
+DELETE FROM dropfk;
+-- dropping partitions is disallowed, even when no referencing values
+DROP TABLE droppk_d;
+DROP TABLE droppk2_d;
+DROP TABLE droppk1;
+-- but DETACH is allowed, and DROP afterwards works
+ALTER TABLE droppk2 DETACH PARTITION droppk21;
+DROP TABLE droppk2;
+
+-- Verify that initial constraint creation and cloning behave correctly
+CREATE SCHEMA fkpart5;
+SET search_path TO fkpart5;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
+CREATE TABLE fk (a int) PARTITION BY LIST (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
+CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
+CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
+CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
+CREATE TABLE pk32 (b int, a int NOT NULL);
+ALTER TABLE pk32 DROP COLUMN b;
+ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
+ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
+CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
+CREATE TABLE fk3 (b int, a int);
+ALTER TABLE fk3 DROP COLUMN b;
+ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
+SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
+ CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
+FROM pg_catalog.pg_constraint
+WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
+ORDER BY conrelid::regclass::text, conname;
+CREATE TABLE fk4 (LIKE fk);
+INSERT INTO fk4 VALUES (50);
+ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
+
+-- Verify ON UPDATE/DELETE behavior
+CREATE SCHEMA fkpart6;
+SET search_path TO fkpart6;
+CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
+CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO pk VALUES (1);
+INSERT INTO fk VALUES (1);
+UPDATE pk SET a = 20;
+SELECT tableoid::regclass, * FROM fk;
+DELETE FROM pk WHERE a = 20;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (50);
+CREATE TABLE fk (a int) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (50);
+UPDATE pk SET a = 21 WHERE a = 20;
+DELETE FROM pk WHERE a = 50;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30), (50);
+CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (1, 20), (2, 30);
+DELETE FROM pk WHERE a = 20 RETURNING *;
+UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;
+
+TRUNCATE TABLE pk;
+INSERT INTO pk VALUES (20), (30);
+CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
+CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
+CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
+CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
+ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
+CREATE TABLE fk_d PARTITION OF fk DEFAULT;
+INSERT INTO fk VALUES (20), (30);
+DELETE FROM pk WHERE a = 20;
+UPDATE pk SET a = 90 WHERE a = 30;
+SELECT tableoid::regclass, * FROM fk;
+DROP TABLE fk;