1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Move most of the error checking for foreign-key constraints out of

parse analysis and into the execution code (in tablecmds.c).  This
eliminates a lot of unreasonably complex code that needed to have two
or more execution paths in case it was dealing with a not-yet-created
table column vs. an already-existing one.  The execution code is always
dealing with already-created tables and so needs only one case.  This
also eliminates some potential race conditions (the table wasn't locked
between parse analysis and execution), makes it easy to fix the gripe
about wrong referenced-column names generating a misleading error message,
and lets us easily add a dependency from the foreign-key constraint to
the unique index that it requires the referenced table to have.  (Cf.
complaint from Kris Jurka 12-Sep-2002 on pgsql-bugs.)

Also, third try at building a deletion mechanism that is not sensitive
to the order in which pg_depend entries are visited.  Adding the above-
mentioned dependency exposed the folly of what dependency.c had been
doing: it failed for cases where B depends on C while both auto-depend
on A.  Dropping A should succeed in this case, but was failing if C
happened to be visited before B.  It appears the only solution is two
separate walks over the dependency tree.
This commit is contained in:
Tom Lane
2002-09-22 00:37:09 +00:00
parent e303a2dbe8
commit ac355d558e
10 changed files with 770 additions and 813 deletions

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.249 2002/09/18 21:35:21 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.250 2002/09/22 00:37:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -117,10 +117,7 @@ static List *getSetColTypes(ParseState *pstate, Node *node);
static void transformForUpdate(Query *qry, List *forUpdate);
static void transformConstraintAttrs(List *constraintList);
static void transformColumnType(ParseState *pstate, ColumnDef *column);
static void transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid);
static void transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid);
static bool relationHasPrimaryKey(Oid relationOid);
static Oid transformFkeyGetColType(CreateStmtContext *cxt, char *colname);
static void release_pstate_resources(ParseState *pstate);
static FromExpr *makeFromExpr(List *fromlist, Node *quals);
@ -1301,189 +1298,42 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
static void
transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt)
{
List *fkactions = NIL;
List *fkclist;
if (cxt->fkconstraints == NIL)
return;
elog(NOTICE, "%s will create implicit trigger(s) for FOREIGN KEY check(s)",
cxt->stmtType);
foreach(fkclist, cxt->fkconstraints)
{
FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist);
Oid pktypoid[INDEX_MAX_KEYS];
Oid fktypoid[INDEX_MAX_KEYS];
int i;
int attnum;
List *fkattrs;
for (attnum = 0; attnum < INDEX_MAX_KEYS; attnum++)
pktypoid[attnum] = fktypoid[attnum] = InvalidOid;
/*
* Look up the referencing attributes to make sure they exist (or
* will exist) in this table, and remember their type OIDs.
*/
attnum = 0;
foreach(fkattrs, fkconstraint->fk_attrs)
{
char *fkattr = strVal(lfirst(fkattrs));
if (attnum >= INDEX_MAX_KEYS)
elog(ERROR, "Can only have %d keys in a foreign key",
INDEX_MAX_KEYS);
fktypoid[attnum++] = transformFkeyGetColType(cxt, fkattr);
}
/*
* If the attribute list for the referenced table was omitted,
* lookup the definition of the primary key.
*/
if (fkconstraint->pk_attrs == NIL)
{
if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0)
transformFkeyGetPrimaryKey(fkconstraint, pktypoid);
else if (cxt->pkey != NULL)
{
/* Use the to-be-created primary key */
List *attr;
attnum = 0;
foreach(attr, cxt->pkey->indexParams)
{
IndexElem *ielem = lfirst(attr);
char *iname = ielem->name;
Assert(iname); /* no func index here */
fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs,
makeString(iname));
if (attnum >= INDEX_MAX_KEYS)
elog(ERROR, "Can only have %d keys in a foreign key",
INDEX_MAX_KEYS);
pktypoid[attnum++] = transformFkeyGetColType(cxt,
iname);
}
}
else
{
/* In ALTER TABLE case, primary key may already exist */
if (OidIsValid(cxt->relOid))
transformFkeyGetPrimaryKey(fkconstraint, pktypoid);
else
elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found",
fkconstraint->pktable->relname);
}
}
else
{
/* Validate the specified referenced key list */
if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0)
transformFkeyCheckAttrs(fkconstraint, pktypoid);
else
{
/* Look for a matching new unique/primary constraint */
List *index;
bool found = false;
foreach(index, cxt->alist)
{
IndexStmt *ind = lfirst(index);
List *pkattrs;
if (!ind->unique)
continue;
if (length(ind->indexParams) !=
length(fkconstraint->pk_attrs))
continue;
attnum = 0;
foreach(pkattrs, fkconstraint->pk_attrs)
{
char *pkattr = strVal(lfirst(pkattrs));
List *indparms;
found = false;
foreach(indparms, ind->indexParams)
{
IndexElem *indparm = lfirst(indparms);
if (indparm->name &&
strcmp(indparm->name, pkattr) == 0)
{
found = true;
break;
}
}
if (!found)
break;
if (attnum >= INDEX_MAX_KEYS)
elog(ERROR, "Can only have %d keys in a foreign key",
INDEX_MAX_KEYS);
pktypoid[attnum++] = transformFkeyGetColType(cxt,
pkattr);
}
if (found)
break;
}
if (!found)
{
/*
* In ALTER TABLE case, such an index may already
* exist
*/
if (OidIsValid(cxt->relOid))
transformFkeyCheckAttrs(fkconstraint, pktypoid);
else
elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found",
fkconstraint->pktable->relname);
}
}
}
/* Be sure referencing and referenced column types are comparable */
for (i = 0; i < INDEX_MAX_KEYS && fktypoid[i] != 0; i++)
{
/*
* fktypoid[i] is the foreign key table's i'th element's type
* pktypoid[i] is the primary key table's i'th element's type
*
* We let oper() do our work for us, including elog(ERROR) if the
* types don't compare with =
*/
Operator o = oper(makeList1(makeString("=")),
fktypoid[i], pktypoid[i], false);
ReleaseSysCache(o);
}
/*
* For ALTER TABLE ADD CONSTRAINT, we're done. For CREATE TABLE,
* gin up an ALTER TABLE ADD CONSTRAINT command to execute after
* the basic CREATE TABLE is complete.
*/
if (strcmp(cxt->stmtType, "CREATE TABLE") == 0)
{
AlterTableStmt *alterstmt = makeNode(AlterTableStmt);
alterstmt->subtype = 'c'; /* preprocessed add constraint */
alterstmt->relation = cxt->relation;
alterstmt->name = NULL;
alterstmt->def = (Node *) makeList1(fkconstraint);
/* Don't need to scan the table contents in this case */
fkconstraint->skip_validation = true;
fkactions = lappend(fkactions, (Node *) alterstmt);
}
}
/*
* Attach completed list of extra actions to cxt->alist. We cannot do
* this earlier, because we assume above that cxt->alist still holds
* only IndexStmts.
* For ALTER TABLE ADD CONSTRAINT, nothing to do. For CREATE TABLE,
* gin up an ALTER TABLE ADD CONSTRAINT command to execute after
* the basic CREATE TABLE is complete.
*
* Note: the ADD CONSTRAINT command must also execute after any index
* creation commands. Thus, this should run after
* transformIndexConstraints, so that the CREATE INDEX commands are
* already in cxt->alist.
*/
cxt->alist = nconc(cxt->alist, fkactions);
if (strcmp(cxt->stmtType, "CREATE TABLE") == 0)
{
AlterTableStmt *alterstmt = makeNode(AlterTableStmt);
List *fkclist;
alterstmt->subtype = 'c'; /* preprocessed add constraint */
alterstmt->relation = cxt->relation;
alterstmt->name = NULL;
alterstmt->def = (Node *) cxt->fkconstraints;
/* Don't need to scan the table contents in this case */
foreach(fkclist, cxt->fkconstraints)
{
FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist);
fkconstraint->skip_validation = true;
}
cxt->alist = lappend(cxt->alist, (Node *) alterstmt);
}
}
/*
@ -2375,6 +2225,7 @@ static Query *
transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt,
List **extras_before, List **extras_after)
{
Relation rel;
CreateStmtContext cxt;
Query *qry;
@ -2382,14 +2233,20 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt,
* The only subtypes that currently require parse transformation
* handling are 'A'dd column and Add 'C'onstraint. These largely
* re-use code from CREATE TABLE.
*
* If we need to do any parse transformation, get exclusive lock on
* the relation to make sure it won't change before we execute the
* command.
*/
switch (stmt->subtype)
{
case 'A':
rel = heap_openrv(stmt->relation, AccessExclusiveLock);
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.inhRelations = NIL;
cxt.relOid = RangeVarGetRelid(stmt->relation, false);
cxt.relOid = RelationGetRelid(rel);
cxt.hasoids = SearchSysCacheExists(ATTNUM,
ObjectIdGetDatum(cxt.relOid),
Int16GetDatum(ObjectIdAttributeNumber),
@ -2412,13 +2269,17 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt,
((ColumnDef *) stmt->def)->constraints = cxt.ckconstraints;
*extras_before = nconc(*extras_before, cxt.blist);
*extras_after = nconc(cxt.alist, *extras_after);
heap_close(rel, NoLock); /* close rel, keep lock */
break;
case 'C':
rel = heap_openrv(stmt->relation, AccessExclusiveLock);
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.inhRelations = NIL;
cxt.relOid = RangeVarGetRelid(stmt->relation, false);
cxt.relOid = RelationGetRelid(rel);
cxt.hasoids = SearchSysCacheExists(ATTNUM,
ObjectIdGetDatum(cxt.relOid),
Int16GetDatum(ObjectIdAttributeNumber),
@ -2446,6 +2307,8 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt,
stmt->def = (Node *) nconc(cxt.ckconstraints, cxt.fkconstraints);
*extras_before = nconc(*extras_before, cxt.blist);
*extras_after = nconc(cxt.alist, *extras_after);
heap_close(rel, NoLock); /* close rel, keep lock */
break;
case 'c':
@ -2674,174 +2537,6 @@ transformForUpdate(Query *qry, List *forUpdate)
}
/*
* transformFkeyCheckAttrs -
*
* Make sure that the attributes of a referenced table
* belong to a unique (or primary key) constraint.
*/
static void
transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid)
{
Relation pkrel;
List *indexoidlist,
*indexoidscan;
int i;
bool found = false;
/*
* Open the referenced table
*/
pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock);
if (pkrel->rd_rel->relkind != RELKIND_RELATION)
elog(ERROR, "Referenced relation \"%s\" is not a table",
fkconstraint->pktable->relname);
/*
* Get the list of index OIDs for the table from the relcache, and
* look up each one in the pg_index syscache for each unique one, and
* then compare the attributes we were given to those defined.
*/
indexoidlist = RelationGetIndexList(pkrel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirsti(indexoidscan);
HeapTuple indexTuple;
Form_pg_index indexStruct;
found = false;
indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexoid),
0, 0, 0);
if (!HeapTupleIsValid(indexTuple))
elog(ERROR, "transformFkeyCheckAttrs: index %u not found",
indexoid);
indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
if (indexStruct->indisunique)
{
for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
;
if (i == length(fkconstraint->pk_attrs))
{
/* go through the fkconstraint->pk_attrs list */
List *attrl;
int attnum = 0;
foreach(attrl, fkconstraint->pk_attrs)
{
char *attrname = strVal(lfirst(attrl));
found = false;
for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
{
int pkattno = indexStruct->indkey[i];
if (namestrcmp(attnumAttName(pkrel, pkattno),
attrname) == 0)
{
pktypoid[attnum++] = attnumTypeId(pkrel, pkattno);
found = true;
break;
}
}
if (!found)
break;
}
}
}
ReleaseSysCache(indexTuple);
if (found)
break;
}
if (!found)
elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found",
fkconstraint->pktable->relname);
freeList(indexoidlist);
heap_close(pkrel, AccessShareLock);
}
/*
* transformFkeyGetPrimaryKey -
*
* Try to find the primary key attributes of a referenced table if
* the column list in the REFERENCES specification was omitted.
*/
static void
transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid)
{
Relation pkrel;
List *indexoidlist,
*indexoidscan;
HeapTuple indexTuple = NULL;
Form_pg_index indexStruct = NULL;
int i;
int attnum = 0;
/*
* Open the referenced table
*/
pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock);
if (pkrel->rd_rel->relkind != RELKIND_RELATION)
elog(ERROR, "Referenced relation \"%s\" is not a table",
fkconstraint->pktable->relname);
/*
* Get the list of index OIDs for the table from the relcache, and
* look up each one in the pg_index syscache until we find one marked
* primary key (hopefully there isn't more than one such).
*/
indexoidlist = RelationGetIndexList(pkrel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirsti(indexoidscan);
indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexoid),
0, 0, 0);
if (!HeapTupleIsValid(indexTuple))
elog(ERROR, "transformFkeyGetPrimaryKey: index %u not found",
indexoid);
indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
if (indexStruct->indisprimary)
break;
ReleaseSysCache(indexTuple);
indexStruct = NULL;
}
freeList(indexoidlist);
/*
* Check that we found it
*/
if (indexStruct == NULL)
elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found",
fkconstraint->pktable->relname);
/*
* Now build the list of PK attributes from the indkey definition
* using the attribute names of the PK relation descriptor
*/
for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++)
{
int pkattno = indexStruct->indkey[i];
pktypoid[attnum++] = attnumTypeId(pkrel, pkattno);
fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs,
makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno)))));
}
ReleaseSysCache(indexTuple);
heap_close(pkrel, AccessShareLock);
}
/*
* relationHasPrimaryKey -
*
@ -2888,79 +2583,6 @@ relationHasPrimaryKey(Oid relationOid)
return result;
}
/*
* transformFkeyGetColType -
*
* Find a referencing column by name, and return its type OID.
* Error if it can't be found.
*/
static Oid
transformFkeyGetColType(CreateStmtContext *cxt, char *colname)
{
List *cols;
List *inher;
Oid result;
Form_pg_attribute sysatt;
/* First look for column among the newly-created columns */
foreach(cols, cxt->columns)
{
ColumnDef *col = lfirst(cols);
if (strcmp(col->colname, colname) == 0)
return typenameTypeId(col->typename);
}
/* Perhaps it's a system column name */
sysatt = SystemAttributeByName(colname, cxt->hasoids);
if (sysatt)
return sysatt->atttypid;
/* Look for column among inherited columns (if CREATE TABLE case) */
foreach(inher, cxt->inhRelations)
{
RangeVar *inh = lfirst(inher);
Relation rel;
int count;
Assert(IsA(inh, RangeVar));
rel = heap_openrv(inh, AccessShareLock);
if (rel->rd_rel->relkind != RELKIND_RELATION)
elog(ERROR, "inherited table \"%s\" is not a relation",
inh->relname);
for (count = 0; count < rel->rd_att->natts; count++)
{
Form_pg_attribute inhattr = rel->rd_att->attrs[count];
char *inhname = NameStr(inhattr->attname);
if (inhattr->attisdropped)
continue;
if (strcmp(inhname, colname) == 0)
{
result = inhattr->atttypid;
heap_close(rel, NoLock);
return result;
}
}
heap_close(rel, NoLock);
}
/* Look for column among existing columns (if ALTER TABLE case) */
if (OidIsValid(cxt->relOid))
{
HeapTuple atttuple;
atttuple = SearchSysCacheAttName(cxt->relOid, colname);
if (HeapTupleIsValid(atttuple))
{
result = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid;
ReleaseSysCache(atttuple);
return result;
}
}
elog(ERROR, "%s: column \"%s\" referenced in foreign key constraint does not exist",
cxt->stmtType, colname);
return InvalidOid; /* keep compiler quiet */
}
/*
* Preprocess a list of column constraint clauses
* to attach constraint attributes to their primary constraint nodes