1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Fix collation handling for foreign keys

Allowing foreign keys where the referenced and the referencing columns
have collations with different notions of equality is problematic.
This can only happen when using nondeterministic collations, for
example, if the referencing column is case-insensitive and the
referenced column is not, or vice versa.  It does not happen if both
collations are deterministic.

To show one example:

    CREATE COLLATION case_insensitive (provider = icu, deterministic = false, locale = 'und-u-ks-level2');

    CREATE TABLE pktable (x text COLLATE "C" PRIMARY KEY);
    CREATE TABLE fktable (x text COLLATE case_insensitive REFERENCES pktable ON UPDATE CASCADE ON DELETE CASCADE);
    INSERT INTO pktable VALUES ('A'), ('a');
    INSERT INTO fktable VALUES ('A');

    BEGIN; DELETE FROM pktable WHERE x = 'a'; TABLE fktable; ROLLBACK;
    BEGIN; DELETE FROM pktable WHERE x = 'A'; TABLE fktable; ROLLBACK;

Both of these DELETE statements delete the one row from fktable.  So
this means that one row from fktable references two rows in pktable,
which should not happen.  (That's why a primary key or unique
constraint is required on pktable.)

When nondeterministic collations were implemented, the SQL standard
available to yours truly said that referential integrity checks should
be performed with the collation of the referenced column, and so
that's how we implemented it.  But this turned out to be a mistake in
the SQL standard, for the same reasons as above, that was later
(SQL:2016) fixed to require both collations to be the same.  So that's
what we are aiming for here.

We don't have to be quite so strict.  We can allow different
collations if they are both deterministic.  This is also good for
backward compatibility.

So the new rule is that the collations either have to be the same or
both deterministic.  Or in other words, if one of them is
nondeterministic, then both have to be the same.

Users upgrading from before that have affected setups will need to
make changes to their schemas (i.e., change one or both collations in
affected foreign-key relationships) before the upgrade will succeed.

Some of the nice test cases for the previous situation in
collate.icu.utf8.sql are now obsolete.  They are changed to just check
the error checking of the new rule.  Note that collate.sql already
contained a test for foreign keys with different deterministic
collations.

A bunch of code in ri_triggers.c that added a COLLATE clause to
enforce the referenced column's collation can be removed, because both
columns now have to have the same notion of equality, so it doesn't
matter which one to use.

Reported-by: Paul Jungwirth <pj@illuminatedcomputing.com>
Reviewed-by: Jian He <jian.universality@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/78d824e0-b21e-480d-a252-e4b84bc2c24b@illuminatedcomputing.com
This commit is contained in:
Peter Eisentraut
2024-11-15 14:52:28 +01:00
parent 90bcc7c2db
commit 9321d2fdf8
5 changed files with 104 additions and 177 deletions

View File

@ -398,10 +398,10 @@ static ObjectAddress ATExecValidateConstraint(List **wqueue,
Relation rel, char *constrName,
bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
int16 *attnums, Oid *atttypids, Oid *attcollids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
List **attnamelist,
int16 *attnums, Oid *atttypids,
int16 *attnums, Oid *atttypids, Oid *attcollids,
Oid *opclasses, bool *pk_has_without_overlaps);
static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
@ -9705,6 +9705,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
int16 fkattnum[INDEX_MAX_KEYS] = {0};
Oid pktypoid[INDEX_MAX_KEYS] = {0};
Oid fktypoid[INDEX_MAX_KEYS] = {0};
Oid pkcolloid[INDEX_MAX_KEYS] = {0};
Oid fkcolloid[INDEX_MAX_KEYS] = {0};
Oid opclasses[INDEX_MAX_KEYS] = {0};
Oid pfeqoperators[INDEX_MAX_KEYS] = {0};
Oid ppeqoperators[INDEX_MAX_KEYS] = {0};
@ -9801,11 +9803,11 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/*
* Look up the referencing attributes to make sure they exist, and record
* their attnums and type OIDs.
* their attnums and type and collation OIDs.
*/
numfks = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_attrs,
fkattnum, fktypoid);
fkattnum, fktypoid, fkcolloid);
with_period = fkconstraint->fk_with_period || fkconstraint->pk_with_period;
if (with_period && !fkconstraint->fk_with_period)
ereport(ERROR,
@ -9814,7 +9816,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
numfkdelsetcols = transformColumnNameList(RelationGetRelid(rel),
fkconstraint->fk_del_set_cols,
fkdelsetcols, NULL);
fkdelsetcols, NULL, NULL);
validateFkOnDeleteSetColumns(numfks, fkattnum,
numfkdelsetcols, fkdelsetcols,
fkconstraint->fk_del_set_cols);
@ -9823,13 +9825,14 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* If the attribute list for the referenced table was omitted, lookup the
* definition of the primary key and use it. Otherwise, validate the
* supplied attribute list. In either case, discover the index OID and
* index opclasses, and the attnums and type OIDs of the attributes.
* index opclasses, and the attnums and type and collation OIDs of the
* attributes.
*/
if (fkconstraint->pk_attrs == NIL)
{
numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid,
&fkconstraint->pk_attrs,
pkattnum, pktypoid,
pkattnum, pktypoid, pkcolloid,
opclasses, &pk_has_without_overlaps);
/* If the primary key uses WITHOUT OVERLAPS, the fk must use PERIOD */
@ -9842,7 +9845,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
{
numpks = transformColumnNameList(RelationGetRelid(pkrel),
fkconstraint->pk_attrs,
pkattnum, pktypoid);
pkattnum, pktypoid, pkcolloid);
/* Since we got pk_attrs, one should be a period. */
if (with_period && !fkconstraint->pk_with_period)
@ -9944,6 +9947,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pktype = pktypoid[i];
Oid fktype = fktypoid[i];
Oid fktyped;
Oid pkcoll = pkcolloid[i];
Oid fkcoll = fkcolloid[i];
HeapTuple cla_ht;
Form_pg_opclass cla_tup;
Oid amid;
@ -10086,6 +10091,41 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
format_type_be(fktype),
format_type_be(pktype))));
/*
* This shouldn't be possible, but better check to make sure we have a
* consistent state for the check below.
*/
if ((OidIsValid(pkcoll) && !OidIsValid(fkcoll)) || (!OidIsValid(pkcoll) && OidIsValid(fkcoll)))
elog(ERROR, "key columns are not both collatable");
if (OidIsValid(pkcoll) && OidIsValid(fkcoll))
{
bool pkcolldet;
bool fkcolldet;
pkcolldet = get_collation_isdeterministic(pkcoll);
fkcolldet = get_collation_isdeterministic(fkcoll);
/*
* SQL requires that both collations are the same. This is
* because we need a consistent notion of equality on both
* columns. We relax this by allowing different collations if
* they are both deterministic. (This is also for backward
* compatibility, because PostgreSQL has always allowed this.)
*/
if ((!pkcolldet || !fkcolldet) && pkcoll != fkcoll)
ereport(ERROR,
(errcode(ERRCODE_COLLATION_MISMATCH),
errmsg("foreign key constraint \"%s\" cannot be implemented", fkconstraint->conname),
errdetail("Key columns \"%s\" of the referencing table and \"%s\" of the referenced table "
"have incompatible collations: \"%s\" and \"%s\". "
"If either collation is nondeterministic, then both collations have to be the same.",
strVal(list_nth(fkconstraint->fk_attrs, i)),
strVal(list_nth(fkconstraint->pk_attrs, i)),
get_collation_name(fkcoll),
get_collation_name(pkcoll))));
}
if (old_check_ok)
{
/*
@ -10106,6 +10146,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
CoercionPathType new_pathtype;
Oid old_castfunc;
Oid new_castfunc;
Oid old_fkcoll;
Oid new_fkcoll;
Form_pg_attribute attr = TupleDescAttr(tab->oldDesc,
fkattnum[i] - 1);
@ -10121,6 +10163,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
new_pathtype = findFkeyCast(pfeqop_right, new_fktype,
&new_castfunc);
old_fkcoll = attr->attcollation;
new_fkcoll = fkcoll;
/*
* Upon a change to the cast from the FK column to its pfeqop
* operand, revalidate the constraint. For this evaluation, a
@ -10144,9 +10189,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* turn conform to the domain. Consequently, we need not treat
* domains specially here.
*
* Since we require that all collations share the same notion of
* equality (which they do, because texteq reduces to bitwise
* equality), we don't compare collation here.
* If the collation changes, revalidation is required, unless both
* collations are deterministic, because those share the same
* notion of equality (because texteq reduces to bitwise
* equality).
*
* We need not directly consider the PK type. It's necessarily
* binary coercible to the opcintype of the unique index column,
@ -10156,7 +10202,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
old_check_ok = (new_pathtype == old_pathtype &&
new_castfunc == old_castfunc &&
(!IsPolymorphicType(pfeqop_right) ||
new_fktype == old_fktype));
new_fktype == old_fktype) &&
(new_fkcoll == old_fkcoll ||
(get_collation_isdeterministic(old_fkcoll) && get_collation_isdeterministic(new_fkcoll))));
}
pfeqoperators[i] = pfeqop;
@ -12092,7 +12140,8 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
/*
* transformColumnNameList - transform list of column names
*
* Lookup each name and return its attnum and, optionally, type OID
* Lookup each name and return its attnum and, optionally, type and collation
* OIDs
*
* Note: the name of this function suggests that it's general-purpose,
* but actually it's only used to look up names appearing in foreign-key
@ -12101,7 +12150,7 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
*/
static int
transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids)
int16 *attnums, Oid *atttypids, Oid *attcollids)
{
ListCell *l;
int attnum;
@ -12132,6 +12181,8 @@ transformColumnNameList(Oid relId, List *colList,
attnums[attnum] = attform->attnum;
if (atttypids != NULL)
atttypids[attnum] = attform->atttypid;
if (attcollids != NULL)
attcollids[attnum] = attform->attcollation;
ReleaseSysCache(atttuple);
attnum++;
}
@ -12142,7 +12193,7 @@ transformColumnNameList(Oid relId, List *colList,
/*
* transformFkeyGetPrimaryKey -
*
* Look up the names, attnums, and types of the primary key attributes
* Look up the names, attnums, types, and collations of the primary key attributes
* for the pkrel. Also return the index OID and index opclasses of the
* index supporting the primary key. Also return whether the index has
* WITHOUT OVERLAPS.
@ -12155,7 +12206,7 @@ transformColumnNameList(Oid relId, List *colList,
static int
transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
List **attnamelist,
int16 *attnums, Oid *atttypids,
int16 *attnums, Oid *atttypids, Oid *attcollids,
Oid *opclasses, bool *pk_has_without_overlaps)
{
List *indexoidlist;
@ -12229,6 +12280,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
attnums[i] = pkattno;
atttypids[i] = attnumTypeId(pkrel, pkattno);
attcollids[i] = attnumCollationId(pkrel, pkattno);
opclasses[i] = indclass->values[i];
*attnamelist = lappend(*attnamelist,
makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));