diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index cfd131460d1..55ba026e028 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -39,10 +39,6 @@ #include "utils/tqual.h" -static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned); - - /* * CreateConstraintEntry * Create a constraint table entry. @@ -377,306 +373,6 @@ CreateConstraintEntry(const char *constraintName, return conOid; } -/* - * CloneForeignKeyConstraints - * Clone foreign keys from a partitioned table to a newly acquired - * partition. - * - * relationId is a partition of parentId, so we can be certain that it has the - * same columns with the same datatypes. The columns may be in different - * order, though. - * - * The *cloned list is appended ClonedConstraint elements describing what was - * created. - */ -void -CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) -{ - Relation pg_constraint; - Relation parentRel; - Relation rel; - ScanKeyData key; - SysScanDesc scan; - HeapTuple tuple; - List *clone = NIL; - - parentRel = heap_open(parentId, NoLock); /* already got lock */ - /* see ATAddForeignKeyConstraint about lock level */ - rel = heap_open(relationId, AccessExclusiveLock); - pg_constraint = heap_open(ConstraintRelationId, RowShareLock); - - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(parentId)); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); - while ((tuple = systable_getnext(scan)) != NULL) - { - Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; - - clone = lappend_oid(clone, oid); - } - systable_endscan(scan); - - /* Do the actual work, recursing to partitions as needed */ - clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned); - - /* We're done. Clean up */ - heap_close(parentRel, NoLock); - heap_close(rel, NoLock); /* keep lock till commit */ - heap_close(pg_constraint, RowShareLock); -} - -/* - * clone_fk_constraints - * Recursive subroutine for CloneForeignKeyConstraints - * - * Clone the given list of FK constraints when a partition is attached. - * - * When cloning foreign keys to a partition, it may happen that equivalent - * constraints already exist in the partition for some of them. We can skip - * creating a clone in that case, and instead just attach the existing - * constraint to the one in the parent. - * - * This function recurses to partitions, if the new partition is partitioned; - * of course, only do this for FKs that were actually cloned. - */ -static void -clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned) -{ - AttrNumber *attmap; - List *partFKs; - List *subclone = NIL; - ListCell *cell; - - /* - * The constraint key may differ, if the columns in the partition are - * different. This map is used to convert them. - */ - attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), - RelationGetDescr(parentRel), - gettext_noop("could not convert row type")); - - partFKs = copyObject(RelationGetFKeyList(partRel)); - - foreach(cell, clone) - { - Oid parentConstrOid = lfirst_oid(cell); - Form_pg_constraint constrForm; - HeapTuple tuple; - AttrNumber conkey[INDEX_MAX_KEYS]; - AttrNumber mapped_conkey[INDEX_MAX_KEYS]; - AttrNumber confkey[INDEX_MAX_KEYS]; - Oid conpfeqop[INDEX_MAX_KEYS]; - Oid conppeqop[INDEX_MAX_KEYS]; - Oid conffeqop[INDEX_MAX_KEYS]; - Constraint *fkconstraint; - bool attach_it; - Oid constrOid; - ObjectAddress parentAddr, - childAddr; - int nelem; - ListCell *cell; - int i; - - tuple = SearchSysCache1(CONSTROID, parentConstrOid); - if (!tuple) - elog(ERROR, "cache lookup failed for constraint %u", - parentConstrOid); - constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - - /* only foreign keys */ - if (constrForm->contype != CONSTRAINT_FOREIGN) - { - ReleaseSysCache(tuple); - continue; - } - - ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); - - DeconstructFkConstraintRow(tuple, &nelem, conkey, confkey, - conpfeqop, conppeqop, conffeqop); - for (i = 0; i < nelem; i++) - mapped_conkey[i] = attmap[conkey[i] - 1]; - - /* - * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive - * verification step and don't end up with a duplicate FK. This also - * means we don't consider this constraint when recursing to - * partitions. - */ - attach_it = false; - foreach(cell, partFKs) - { - ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) - { - attach_it = false; - continue; - } - for (i = 0; i < nelem; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* looks good! Attach this constraint */ - ConstraintSetParentConstraint(fk->conoid, constrForm->oid); - CommandCounterIncrement(); - attach_it = true; - break; - } - - /* - * If we attached to an existing constraint, there is no need to - * create a new one. In fact, there's no need to recurse for this - * constraint to partitions, either. - */ - if (attach_it) - { - ReleaseSysCache(tuple); - continue; - } - - constrOid = - CreateConstraintEntry(NameStr(constrForm->conname), - constrForm->connamespace, - CONSTRAINT_FOREIGN, - constrForm->condeferrable, - constrForm->condeferred, - constrForm->convalidated, - constrForm->oid, - RelationGetRelid(partRel), - mapped_conkey, - nelem, - nelem, - InvalidOid, /* not a domain constraint */ - constrForm->conindid, /* same index */ - constrForm->confrelid, /* same foreign rel */ - confkey, - conpfeqop, - conppeqop, - conffeqop, - nelem, - constrForm->confupdtype, - constrForm->confdeltype, - constrForm->confmatchtype, - NULL, - NULL, - NULL, - false, - 1, false, true); - subclone = lappend_oid(subclone, constrOid); - - ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); - recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); - - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); - fkconstraint->fk_upd_action = constrForm->confupdtype; - fkconstraint->fk_del_action = constrForm->confdeltype; - fkconstraint->deferrable = constrForm->condeferrable; - fkconstraint->initdeferred = constrForm->condeferred; - - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); - - if (cloned) - { - ClonedConstraint *newc; - - /* - * Feed back caller about the constraints we created, so that they - * can set up constraint verification. - */ - newc = palloc(sizeof(ClonedConstraint)); - newc->relid = RelationGetRelid(partRel); - newc->refrelid = constrForm->confrelid; - newc->conindid = constrForm->conindid; - newc->conid = constrOid; - newc->constraint = fkconstraint; - - *cloned = lappend(*cloned, newc); - } - - ReleaseSysCache(tuple); - } - - pfree(attmap); - list_free_deep(partFKs); - - /* - * If the partition is partitioned, recurse to handle any constraints that - * were cloned. - */ - if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - subclone != NIL) - { - PartitionDesc partdesc = RelationGetPartitionDesc(partRel); - int i; - - for (i = 0; i < partdesc->nparts; i++) - { - Relation childRel; - - childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); - clone_fk_constraints(pg_constraint, - partRel, - childRel, - subclone, - cloned); - heap_close(childRel, NoLock); /* keep lock till commit */ - } - } -} - /* * Test whether given name is currently used as a constraint name * for the given object (relation or domain). diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d2781cbf194..1e108fd61b5 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -412,6 +412,10 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, + List **cloned); +static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -7689,6 +7693,308 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, return address; } +/* + * CloneForeignKeyConstraints + * Clone foreign keys from a partitioned table to a newly acquired + * partition. + * + * relationId is a partition of parentId, so we can be certain that it has the + * same columns with the same datatypes. The columns may be in different + * order, though. + * + * The *cloned list is appended ClonedConstraint elements describing what was + * created, for the purposes of validating the constraint in ALTER TABLE's + * Phase 3. + */ +static void +CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +{ + Relation pg_constraint; + Relation parentRel; + Relation rel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tuple; + List *clone = NIL; + + parentRel = heap_open(parentId, NoLock); /* already got lock */ + /* see ATAddForeignKeyConstraint about lock level */ + rel = heap_open(relationId, AccessExclusiveLock); + pg_constraint = heap_open(ConstraintRelationId, RowShareLock); + + /* Obtain the list of constraints to clone or attach */ + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(parentId)); + scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, + NULL, 1, &key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + + clone = lappend_oid(clone, oid); + } + systable_endscan(scan); + + /* Do the actual work, recursing to partitions as needed */ + CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + + /* We're done. Clean up */ + heap_close(parentRel, NoLock); + heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(pg_constraint, RowShareLock); +} + +/* + * CloneFkReferencing + * Recursive subroutine for CloneForeignKeyConstraints, referencing side + * + * Clone the given list of FK constraints when a partition is attached on the + * referencing side of those constraints. + * + * When cloning foreign keys to a partition, it may happen that equivalent + * constraints already exist in the partition for some of them. We can skip + * creating a clone in that case, and instead just attach the existing + * constraint to the one in the parent. + * + * This function recurses to partitions, if the new partition is partitioned; + * of course, only do this for FKs that were actually cloned. + */ +static void +CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned) +{ + AttrNumber *attmap; + List *partFKs; + List *subclone = NIL; + ListCell *cell; + + /* + * The constraint key may differ, if the columns in the partition are + * different. This map is used to convert them. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + + partFKs = copyObject(RelationGetFKeyList(partRel)); + + foreach(cell, clone) + { + Oid parentConstrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + HeapTuple tuple; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + bool attach_it; + Oid constrOid; + ObjectAddress parentAddr, + childAddr; + ListCell *cell; + int i; + + tuple = SearchSysCache1(CONSTROID, parentConstrOid); + if (!tuple) + elog(ERROR, "cache lookup failed for constraint %u", + parentConstrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* only foreign keys */ + if (constrForm->contype != CONSTRAINT_FOREIGN) + { + ReleaseSysCache(tuple); + continue; + } + + ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + + DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, + conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) + mapped_conkey[i] = attmap[conkey[i] - 1]; + + /* + * Before creating a new constraint, see whether any existing FKs are + * fit for the purpose. If one is, attach the parent constraint to it, + * and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK. This also + * means we don't consider this constraint when recursing to + * partitions. + */ + attach_it = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); + Form_pg_constraint partConstr; + HeapTuple partcontup; + + attach_it = true; + + /* + * Do some quick & easy initial checks. If any of these fail, we + * cannot use this constraint, but keep looking. + */ + if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + { + attach_it = false; + continue; + } + for (i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + attach_it = false; + break; + } + } + if (!attach_it) + continue; + + /* + * Looks good so far; do some more extensive checks. Presumably + * the check for 'convalidated' could be dropped, since we don't + * really care about that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != constrForm->condeferrable || + partConstr->condeferred != constrForm->condeferred || + partConstr->confupdtype != constrForm->confupdtype || + partConstr->confdeltype != constrForm->confdeltype || + partConstr->confmatchtype != constrForm->confmatchtype) + { + ReleaseSysCache(partcontup); + attach_it = false; + continue; + } + + ReleaseSysCache(partcontup); + + /* looks good! Attach this constraint */ + ConstraintSetParentConstraint(fk->conoid, parentConstrOid); + CommandCounterIncrement(); + attach_it = true; + break; + } + + /* + * If we attached to an existing constraint, there is no need to + * create a new one. In fact, there's no need to recurse for this + * constraint to partitions, either. + */ + if (attach_it) + { + ReleaseSysCache(tuple); + continue; + } + + constrOid = + CreateConstraintEntry(NameStr(constrForm->conname), + constrForm->connamespace, + CONSTRAINT_FOREIGN, + constrForm->condeferrable, + constrForm->condeferred, + constrForm->convalidated, + parentConstrOid, + RelationGetRelid(partRel), + mapped_conkey, + numfks, + numfks, + InvalidOid, /* not a domain constraint */ + constrForm->conindid, /* same index */ + constrForm->confrelid, /* same foreign rel */ + confkey, + conpfeqop, + conppeqop, + conffeqop, + numfks, + constrForm->confupdtype, + constrForm->confdeltype, + constrForm->confmatchtype, + NULL, + NULL, + NULL, + false, + 1, false, true); + subclone = lappend_oid(subclone, constrOid); + + ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); + recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + + createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, + constrOid, constrForm->conindid, false); + + if (cloned) + { + ClonedConstraint *newc; + + /* + * Feed back caller about the constraints we created, so that they + * can set up constraint verification. + */ + newc = palloc(sizeof(ClonedConstraint)); + newc->relid = RelationGetRelid(partRel); + newc->refrelid = constrForm->confrelid; + newc->conindid = constrForm->conindid; + newc->conid = constrOid; + newc->constraint = fkconstraint; + + *cloned = lappend(*cloned, newc); + } + + ReleaseSysCache(tuple); + } + + pfree(attmap); + list_free_deep(partFKs); + + /* + * If the partition is partitioned, recurse to handle any constraints that + * were cloned. + */ + if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + subclone != NIL) + { + PartitionDesc partdesc = RelationGetPartitionDesc(partRel); + int i; + + for (i = 0; i < partdesc->nparts; i++) + { + Relation childRel; + + childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); + CloneFkReferencing(pg_constraint, + partRel, + childRel, + subclone, + cloned); + heap_close(childRel, NoLock); /* keep lock till commit */ + } + } +} + /* * ALTER TABLE ALTER CONSTRAINT * diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index 0944dc8be74..55a2694b101 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -226,9 +226,6 @@ extern Oid CreateConstraintEntry(const char *constraintName, bool conNoInherit, bool is_internal); -extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId, - List **cloned); - extern void RemoveConstraintById(Oid conId); extern void RenameConstraintById(Oid conId, const char *newname);