1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Allow specifying column list for foreign key ON DELETE SET actions

Extend the foreign key ON DELETE actions SET NULL and SET DEFAULT by
allowing the specification of a column list, like

    CREATE TABLE posts (
        ...
        FOREIGN KEY (tenant_id, author_id) REFERENCES users ON DELETE SET NULL (author_id)
    );

If a column list is specified, only those columns are set to
null/default, instead of all the columns in the foreign-key
constraint.

This is useful for multitenant or sharded schemas, where the tenant or
shard ID is included in the primary key of all tables but shouldn't be
set to null.

Author: Paul Martinez <paulmtz@google.com>
Discussion: https://www.postgresql.org/message-id/flat/CACqFVBZQyMYJV=njbSMxf+rbDHpx=W=B7AEaMKn8dWn9OZJY7w@mail.gmail.com
This commit is contained in:
Peter Eisentraut
2021-12-08 11:09:44 +01:00
parent e464cb7af3
commit d6f96ed94e
22 changed files with 557 additions and 72 deletions

View File

@ -482,11 +482,16 @@ static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstra
Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok);
static void validateFkOnDeleteSetColumns(int numfks, const int16 *fkattnums,
int numfksetcols, const int16 *fksetcolsattnums,
List *fksetcols);
static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok, LOCKMODE lockmode);
static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
Relation partitionRel);
@ -8973,9 +8978,11 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pfeqoperators[INDEX_MAX_KEYS];
Oid ppeqoperators[INDEX_MAX_KEYS];
Oid ffeqoperators[INDEX_MAX_KEYS];
int16 fkdelsetcols[INDEX_MAX_KEYS];
int i;
int numfks,
numpks;
numpks,
numfkdelsetcols;
Oid indexOid;
bool old_check_ok;
ObjectAddress address;
@ -9071,11 +9078,19 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
MemSet(pfeqoperators, 0, sizeof(pfeqoperators));
MemSet(ppeqoperators, 0, sizeof(ppeqoperators));
MemSet(ffeqoperators, 0, sizeof(ffeqoperators));
MemSet(fkdelsetcols, 0, sizeof(fkdelsetcols));
numfks = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_attrs,
fkattnum, fktypoid);
numfkdelsetcols = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_del_set_cols,
fkdelsetcols, NULL);
validateFkOnDeleteSetColumns(numfks, fkattnum,
numfkdelsetcols, fkdelsetcols,
fkconstraint->fk_del_set_cols);
/*
* If the attribute list for the referenced table was omitted, lookup the
* definition of the primary key and use it. Otherwise, validate the
@ -9350,6 +9365,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
numfkdelsetcols,
fkdelsetcols,
old_check_ok);
/* Now handle the referencing side. */
@ -9362,6 +9379,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
numfkdelsetcols,
fkdelsetcols,
old_check_ok,
lockmode);
@ -9373,6 +9392,40 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
return address;
}
/*
* validateFkActionSetColumns
* Verifies that columns used in ON DELETE SET NULL/DEFAULT (...)
* column lists are valid.
*/
void
validateFkOnDeleteSetColumns(int numfks, const int16 *fkattnums,
int numfksetcols, const int16 *fksetcolsattnums,
List *fksetcols)
{
for (int i = 0; i < numfksetcols; i++)
{
int16 setcol_attnum = fksetcolsattnums[i];
bool seen = false;
for (int j = 0; j < numfks; j++)
{
if (fkattnums[j] == setcol_attnum)
{
seen = true;
break;
}
}
if (!seen)
{
char *col = strVal(list_nth(fksetcols, i));
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("column \"%s\" referenced in ON DELETE SET action must be part of foreign key", col)));
}
}
}
/*
* addFkRecurseReferenced
* subroutine for ATAddForeignKeyConstraint; recurses on the referenced
@ -9394,6 +9447,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* numfks is the number of columns in the foreign key
* pkattnum is the attnum array of referenced attributes.
* fkattnum is the attnum array of referencing attributes.
* numfkdelsetcols is the number of columns in the ON DELETE SET NULL/DELETE
* (...) clause
* fkdelsetcols is the attnum array of the columns in the ON DELETE SET
* NULL/DELETE clause
* pf/pp/ffeqoperators are OID array of operators between columns.
* old_check_ok signals that this constraint replaces an existing one that
* was already validated (thus this one doesn't need validation).
@ -9403,7 +9460,9 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks,
int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok)
{
ObjectAddress address;
Oid constrOid;
@ -9478,6 +9537,8 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkdelsetcols,
numfkdelsetcols,
fkconstraint->fk_matchtype,
NULL, /* no exclusion constraint */
NULL, /* no check constraint */
@ -9559,6 +9620,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
partIndexId, constrOid, numfks,
mapped_pkattnum, fkattnum,
pfeqoperators, ppeqoperators, ffeqoperators,
numfkdelsetcols, fkdelsetcols,
old_check_ok);
/* Done -- clean up (but keep the lock) */
@ -9599,6 +9661,10 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
* pkattnum is the attnum array of referenced attributes.
* fkattnum is the attnum array of referencing attributes.
* pf/pp/ffeqoperators are OID array of operators between columns.
* numfkdelsetcols is the number of columns in the ON DELETE SET NULL/DELETE
* (...) clause
* fkdelsetcols is the attnum array of the columns in the ON DELETE SET
* NULL/DELETE clause
* old_check_ok signals that this constraint replaces an existing one that
* was already validated (thus this one doesn't need validation).
* lockmode is the lockmode to acquire on partitions when recursing.
@ -9608,6 +9674,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
int numfkdelsetcols, int16 *fkdelsetcols,
bool old_check_ok, LOCKMODE lockmode)
{
AssertArg(OidIsValid(parentConstr));
@ -9746,6 +9813,8 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkdelsetcols,
numfkdelsetcols,
fkconstraint->fk_matchtype,
NULL,
NULL,
@ -9778,6 +9847,8 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
numfkdelsetcols,
fkdelsetcols,
old_check_ok,
lockmode);
@ -9883,6 +9954,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
int numfkdelsetcols;
AttrNumber confdelsetcols[INDEX_MAX_KEYS];
Constraint *fkconstraint;
tuple = SearchSysCache1(CONSTROID, constrOid);
@ -9915,7 +9988,9 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
confkey,
conpfeqop,
conppeqop,
conffeqop);
conffeqop,
&numfkdelsetcols,
confdelsetcols);
for (int i = 0; i < numfks; i++)
mapped_confkey[i] = attmap->attnums[confkey[i] - 1];
@ -9962,6 +10037,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
conpfeqop,
conppeqop,
conffeqop,
numfkdelsetcols,
confdelsetcols,
true);
table_close(fkRel, NoLock);
@ -10032,6 +10109,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
int numfkdelsetcols;
AttrNumber confdelsetcols[INDEX_MAX_KEYS];
Constraint *fkconstraint;
bool attached;
Oid indexOid;
@ -10063,7 +10142,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
ShareRowExclusiveLock, NULL);
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
conpfeqop, conppeqop, conffeqop,
&numfkdelsetcols, confdelsetcols);
for (int i = 0; i < numfks; i++)
mapped_conkey[i] = attmap->attnums[conkey[i] - 1];
@ -10148,6 +10228,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
confdelsetcols,
numfkdelsetcols,
fkconstraint->fk_matchtype,
NULL,
NULL,
@ -10183,6 +10265,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
conpfeqop,
conppeqop,
conffeqop,
numfkdelsetcols,
confdelsetcols,
false, /* no old check exists */
AccessExclusiveLock);
table_close(pkrel, NoLock);
@ -10804,7 +10888,7 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
/*
* transformColumnNameList - transform list of column names
*
* Lookup each name and return its attnum and type OID
* Lookup each name and return its attnum and, optionally, type OID
*/
static int
transformColumnNameList(Oid relId, List *colList,
@ -10831,7 +10915,8 @@ transformColumnNameList(Oid relId, List *colList,
errmsg("cannot have more than %d keys in a foreign key",
INDEX_MAX_KEYS)));
attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum;
atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
if (atttypids != NULL)
atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
ReleaseSysCache(atttuple);
attnum++;
}