mirror of
https://github.com/postgres/postgres.git
synced 2025-07-24 14:22:24 +03:00
Allow NOT NULL constraints to be added as NOT VALID
This allows them to be added without scanning the table, and validating
them afterwards without holding access exclusive lock on the table after
any violating rows have been deleted or fixed.
Doing ALTER TABLE ... SET NOT NULL for a column that has an invalid
not-null constraint validates that constraint. ALTER TABLE .. VALIDATE
CONSTRAINT is also supported. There are various checks on whether an
invalid constraint is allowed in a child table when the parent table has
a valid constraint; this should match what we do for enforced/not
enforced constraints.
pg_attribute.attnotnull is now only an indicator for whether a not-null
constraint exists for the column; whether it's valid or invalid must be
queried in pg_constraint. Applications can continue to query
pg_attribute.attnotnull as before, but now it's possible that NULL rows
are present in the column even when that's set to true.
For backend internal purposes, we cache the nullability status in
CompactAttribute->attnullability that each tuple descriptor carries
(replacing CompactAttribute.attnotnull, which was a mirror of
Form_pg_attribute.attnotnull). During the initial tuple descriptor
creation, based on the pg_attribute scan, we set this to UNRESTRICTED if
pg_attribute.attnotnull is false, or to UNKNOWN if it's true; then we
update the latter to VALID or INVALID depending on the pg_constraint
scan. This flag is also copied when tupledescs are copied.
Comparing tuple descs for equality must also compare the
CompactAttribute.attnullability flag and return false in case of a
mismatch.
pg_dump deals with these constraints by storing the OIDs of invalid
not-null constraints in a separate array, and running a query to obtain
their properties. The regular table creation SQL omits them entirely.
They are then dealt with in the same way as "separate" CHECK
constraints, and dumped after the data has been loaded. Because no
additional pg_dump infrastructure was required, we don't bump its
version number.
I decided not to bump catversion either, because the old catalog state
works perfectly in the new world. (Trying to run with new catalog state
and the old server version would likely run into issues, however.)
System catalogs do not support invalid not-null constraints (because
commit 14e87ffa5c
didn't allow them to have pg_constraint rows
anyway.)
Author: Rushabh Lathia <rushabh.lathia@gmail.com>
Author: Jian He <jian.universality@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Tested-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Discussion: https://postgr.es/m/CAGPqQf0KitkNack4F5CFkFi-9Dqvp29Ro=EpcWt=4_hs-Rt+bQ@mail.gmail.com
This commit is contained in:
@ -22,6 +22,7 @@
|
||||
#include "access/htup_details.h"
|
||||
#include "access/toast_compression.h"
|
||||
#include "access/tupdesc_details.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "catalog/pg_collation.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "common/hashfn.h"
|
||||
@ -74,7 +75,16 @@ populate_compact_attribute_internal(Form_pg_attribute src,
|
||||
dst->atthasmissing = src->atthasmissing;
|
||||
dst->attisdropped = src->attisdropped;
|
||||
dst->attgenerated = (src->attgenerated != '\0');
|
||||
dst->attnotnull = src->attnotnull;
|
||||
|
||||
/*
|
||||
* Assign nullability status for this column. Assuming that a constraint
|
||||
* exists, at this point we don't know if a not-null constraint is valid,
|
||||
* so we assign UNKNOWN unless the table is a catalog, in which case we
|
||||
* know it's valid.
|
||||
*/
|
||||
dst->attnullability = !src->attnotnull ? ATTNULLABLE_UNRESTRICTED :
|
||||
IsCatalogRelationOid(src->attrelid) ? ATTNULLABLE_VALID :
|
||||
ATTNULLABLE_UNKNOWN;
|
||||
|
||||
switch (src->attalign)
|
||||
{
|
||||
@ -144,9 +154,10 @@ verify_compact_attribute(TupleDesc tupdesc, int attnum)
|
||||
|
||||
/*
|
||||
* Make the attcacheoff match since it's been reset to -1 by
|
||||
* populate_compact_attribute_internal.
|
||||
* populate_compact_attribute_internal. Same with attnullability.
|
||||
*/
|
||||
tmp.attcacheoff = cattr->attcacheoff;
|
||||
tmp.attnullability = cattr->attnullability;
|
||||
|
||||
/* Check the freshly populated CompactAttribute matches the TupleDesc's */
|
||||
Assert(memcmp(&tmp, cattr, sizeof(CompactAttribute)) == 0);
|
||||
@ -333,8 +344,13 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
|
||||
desc->natts * sizeof(FormData_pg_attribute));
|
||||
|
||||
for (i = 0; i < desc->natts; i++)
|
||||
{
|
||||
populate_compact_attribute(desc, i);
|
||||
|
||||
TupleDescCompactAttr(desc, i)->attnullability =
|
||||
TupleDescCompactAttr(tupdesc, i)->attnullability;
|
||||
}
|
||||
|
||||
/* Copy the TupleConstr data structure, if any */
|
||||
if (constr)
|
||||
{
|
||||
@ -613,6 +629,24 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
|
||||
return false;
|
||||
if (attr1->attnotnull != attr2->attnotnull)
|
||||
return false;
|
||||
|
||||
/*
|
||||
* When the column has a not-null constraint, we also need to consider
|
||||
* its validity aspect, which only manifests in CompactAttribute->
|
||||
* attnullability, so verify that.
|
||||
*/
|
||||
if (attr1->attnotnull)
|
||||
{
|
||||
CompactAttribute *cattr1 = TupleDescCompactAttr(tupdesc1, i);
|
||||
CompactAttribute *cattr2 = TupleDescCompactAttr(tupdesc2, i);
|
||||
|
||||
Assert(cattr1->attnullability != ATTNULLABLE_UNKNOWN);
|
||||
Assert((cattr1->attnullability == ATTNULLABLE_UNKNOWN) ==
|
||||
(cattr2->attnullability == ATTNULLABLE_UNKNOWN));
|
||||
|
||||
if (cattr1->attnullability != cattr2->attnullability)
|
||||
return false;
|
||||
}
|
||||
if (attr1->atthasdef != attr2->atthasdef)
|
||||
return false;
|
||||
if (attr1->attidentity != attr2->attidentity)
|
||||
|
@ -2616,12 +2616,17 @@ AddRelationNewConstraints(Relation rel,
|
||||
errmsg("cannot add not-null constraint on system column \"%s\"",
|
||||
strVal(linitial(cdef->keys))));
|
||||
|
||||
Assert(cdef->initially_valid != cdef->skip_validation);
|
||||
|
||||
/*
|
||||
* If the column already has a not-null constraint, we don't want
|
||||
* to add another one; just adjust inheritance status as needed.
|
||||
* to add another one; adjust inheritance status as needed. This
|
||||
* also checks whether the existing constraint matches the
|
||||
* requested validity.
|
||||
*/
|
||||
if (AdjustNotNullInheritance(RelationGetRelid(rel), colnum,
|
||||
is_local, cdef->is_no_inherit))
|
||||
is_local, cdef->is_no_inherit,
|
||||
cdef->skip_validation))
|
||||
continue;
|
||||
|
||||
/*
|
||||
|
@ -576,8 +576,8 @@ ChooseConstraintName(const char *name1, const char *name2,
|
||||
|
||||
/*
|
||||
* Find and return a copy of the pg_constraint tuple that implements a
|
||||
* validated not-null constraint for the given column of the given relation.
|
||||
* If no such constraint exists, return NULL.
|
||||
* (possibly not valid) not-null constraint for the given column of the
|
||||
* given relation. If no such constraint exists, return NULL.
|
||||
*
|
||||
* XXX This would be easier if we had pg_attribute.notnullconstr with the OID
|
||||
* of the constraint that implements the not-null constraint for that column.
|
||||
@ -606,13 +606,11 @@ findNotNullConstraintAttnum(Oid relid, AttrNumber attnum)
|
||||
AttrNumber conkey;
|
||||
|
||||
/*
|
||||
* We're looking for a NOTNULL constraint that's marked validated,
|
||||
* with the column we're looking for as the sole element in conkey.
|
||||
* We're looking for a NOTNULL constraint with the column we're
|
||||
* looking for as the sole element in conkey.
|
||||
*/
|
||||
if (con->contype != CONSTRAINT_NOTNULL)
|
||||
continue;
|
||||
if (!con->convalidated)
|
||||
continue;
|
||||
|
||||
conkey = extractNotNullColumn(conTup);
|
||||
if (conkey != attnum)
|
||||
@ -630,9 +628,10 @@ findNotNullConstraintAttnum(Oid relid, AttrNumber attnum)
|
||||
}
|
||||
|
||||
/*
|
||||
* Find and return the pg_constraint tuple that implements a validated
|
||||
* not-null constraint for the given column of the given relation. If
|
||||
* no such column or no such constraint exists, return NULL.
|
||||
* Find and return a copy of the pg_constraint tuple that implements a
|
||||
* (possibly not valid) not-null constraint for the given column of the
|
||||
* given relation.
|
||||
* If no such column or no such constraint exists, return NULL.
|
||||
*/
|
||||
HeapTuple
|
||||
findNotNullConstraint(Oid relid, const char *colname)
|
||||
@ -723,15 +722,19 @@ extractNotNullColumn(HeapTuple constrTup)
|
||||
*
|
||||
* If no not-null constraint is found for the column, return false.
|
||||
* Caller can create one.
|
||||
*
|
||||
* If a constraint exists but the connoinherit flag is not what the caller
|
||||
* wants, throw an error about the incompatibility. Otherwise, we adjust
|
||||
* conislocal/coninhcount and return true.
|
||||
* In the latter case, if is_local is true we flip conislocal true, or do
|
||||
* nothing if it's already true; otherwise we increment coninhcount by 1.
|
||||
* wants, throw an error about the incompatibility. If the desired
|
||||
* constraint is valid but the existing constraint is not valid, also
|
||||
* throw an error about that (the opposite case is acceptable).
|
||||
*
|
||||
* If everything checks out, we adjust conislocal/coninhcount and return
|
||||
* true. If is_local is true we flip conislocal true, or do nothing if
|
||||
* it's already true; otherwise we increment coninhcount by 1.
|
||||
*/
|
||||
bool
|
||||
AdjustNotNullInheritance(Oid relid, AttrNumber attnum,
|
||||
bool is_local, bool is_no_inherit)
|
||||
bool is_local, bool is_no_inherit, bool is_notvalid)
|
||||
{
|
||||
HeapTuple tup;
|
||||
|
||||
@ -755,6 +758,17 @@ AdjustNotNullInheritance(Oid relid, AttrNumber attnum,
|
||||
errmsg("cannot change NO INHERIT status of NOT NULL constraint \"%s\" on relation \"%s\"",
|
||||
NameStr(conform->conname), get_rel_name(relid)));
|
||||
|
||||
/*
|
||||
* Throw an error if the existing constraint is NOT VALID and caller
|
||||
* wants a valid one.
|
||||
*/
|
||||
if (!is_notvalid && !conform->convalidated)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("incompatible NOT VALID constraint \"%s\" on relation \"%s\"",
|
||||
NameStr(conform->conname), get_rel_name(relid)),
|
||||
errhint("You will need to use ALTER TABLE ... VALIDATE CONSTRAINT to validate it."));
|
||||
|
||||
if (!is_local)
|
||||
{
|
||||
if (pg_add_s16_overflow(conform->coninhcount, 1,
|
||||
@ -832,7 +846,7 @@ RelationGetNotNullConstraints(Oid relid, bool cooked, bool include_noinh)
|
||||
cooked->attnum = colnum;
|
||||
cooked->expr = NULL;
|
||||
cooked->is_enforced = true;
|
||||
cooked->skip_validation = false;
|
||||
cooked->skip_validation = !conForm->convalidated;
|
||||
cooked->is_local = true;
|
||||
cooked->inhcount = 0;
|
||||
cooked->is_no_inherit = conForm->connoinherit;
|
||||
@ -852,7 +866,7 @@ RelationGetNotNullConstraints(Oid relid, bool cooked, bool include_noinh)
|
||||
constr->keys = list_make1(makeString(get_attname(relid, colnum,
|
||||
false)));
|
||||
constr->is_enforced = true;
|
||||
constr->skip_validation = false;
|
||||
constr->skip_validation = !conForm->convalidated;
|
||||
constr->initially_valid = true;
|
||||
constr->is_no_inherit = conForm->connoinherit;
|
||||
notnulls = lappend(notnulls, constr);
|
||||
|
@ -435,6 +435,9 @@ static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation
|
||||
static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
|
||||
char *constrName, HeapTuple contuple,
|
||||
bool recurse, bool recursing, LOCKMODE lockmode);
|
||||
static void QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel,
|
||||
HeapTuple contuple, bool recurse, bool recursing,
|
||||
LOCKMODE lockmode);
|
||||
static int transformColumnNameList(Oid relId, List *colList,
|
||||
int16 *attnums, Oid *atttypids, Oid *attcollids);
|
||||
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
|
||||
@ -498,7 +501,7 @@ static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid)
|
||||
static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
|
||||
LOCKMODE lockmode);
|
||||
static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
|
||||
LOCKMODE lockmode);
|
||||
bool is_valid, bool queue_validation);
|
||||
static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel,
|
||||
char *constrname, char *colName,
|
||||
bool recurse, bool recursing,
|
||||
@ -1340,7 +1343,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
|
||||
nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints,
|
||||
old_notnulls);
|
||||
foreach_int(attrnum, nncols)
|
||||
set_attnotnull(NULL, rel, attrnum, NoLock);
|
||||
set_attnotnull(NULL, rel, attrnum, true, false);
|
||||
|
||||
ObjectAddressSet(address, RelationRelationId, relationId);
|
||||
|
||||
@ -2738,7 +2741,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
|
||||
|
||||
/*
|
||||
* Request attnotnull on columns that have a not-null constraint
|
||||
* that's not marked NO INHERIT.
|
||||
* that's not marked NO INHERIT (even if not valid).
|
||||
*/
|
||||
nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation),
|
||||
true, false);
|
||||
@ -6207,24 +6210,28 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
|
||||
{
|
||||
/*
|
||||
* If we are rebuilding the tuples OR if we added any new but not
|
||||
* verified not-null constraints, check all not-null constraints. This
|
||||
* is a bit of overkill but it minimizes risk of bugs.
|
||||
* verified not-null constraints, check all *valid* not-null
|
||||
* constraints. This is a bit of overkill but it minimizes risk of
|
||||
* bugs.
|
||||
*
|
||||
* notnull_attrs does *not* collect attribute numbers for not-null
|
||||
* constraints over virtual generated columns; instead, they are
|
||||
* collected in notnull_virtual_attrs.
|
||||
* notnull_attrs does *not* collect attribute numbers for valid
|
||||
* not-null constraints over virtual generated columns; instead, they
|
||||
* are collected in notnull_virtual_attrs for verification elsewhere.
|
||||
*/
|
||||
for (i = 0; i < newTupDesc->natts; i++)
|
||||
{
|
||||
Form_pg_attribute attr = TupleDescAttr(newTupDesc, i);
|
||||
CompactAttribute *attr = TupleDescCompactAttr(newTupDesc, i);
|
||||
|
||||
if (attr->attnotnull && !attr->attisdropped)
|
||||
if (attr->attnullability == ATTNULLABLE_VALID &&
|
||||
!attr->attisdropped)
|
||||
{
|
||||
if (attr->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL)
|
||||
notnull_attrs = lappend_int(notnull_attrs, attr->attnum);
|
||||
Form_pg_attribute wholeatt = TupleDescAttr(newTupDesc, i);
|
||||
|
||||
if (wholeatt->attgenerated != ATTRIBUTE_GENERATED_VIRTUAL)
|
||||
notnull_attrs = lappend_int(notnull_attrs, wholeatt->attnum);
|
||||
else
|
||||
notnull_virtual_attrs = lappend_int(notnull_virtual_attrs,
|
||||
attr->attnum);
|
||||
wholeatt->attnum);
|
||||
}
|
||||
}
|
||||
if (notnull_attrs || notnull_virtual_attrs)
|
||||
@ -7809,18 +7816,23 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3
|
||||
* to verify it.
|
||||
* set_attnotnull
|
||||
* Helper to update/validate the pg_attribute status of a not-null
|
||||
* constraint
|
||||
*
|
||||
* When called to alter an existing table, 'wqueue' must be given so that we
|
||||
* can queue a check that existing tuples pass the constraint. When called
|
||||
* from table creation, 'wqueue' should be passed as NULL.
|
||||
* pg_attribute.attnotnull is set true, if it isn't already.
|
||||
* If queue_validation is true, also set up wqueue to validate the constraint.
|
||||
* wqueue may be given as NULL when validation is not needed (e.g., on table
|
||||
* creation).
|
||||
*/
|
||||
static void
|
||||
set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
|
||||
LOCKMODE lockmode)
|
||||
bool is_valid, bool queue_validation)
|
||||
{
|
||||
Form_pg_attribute attr;
|
||||
CompactAttribute *thisatt;
|
||||
|
||||
Assert(!queue_validation || wqueue);
|
||||
|
||||
CheckAlterTableIsSafe(rel);
|
||||
|
||||
@ -7844,8 +7856,11 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
|
||||
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
|
||||
attnum, RelationGetRelid(rel));
|
||||
|
||||
thisatt = TupleDescCompactAttr(RelationGetDescr(rel), attnum - 1);
|
||||
thisatt->attnullability = ATTNULLABLE_VALID;
|
||||
|
||||
attr = (Form_pg_attribute) GETSTRUCT(tuple);
|
||||
Assert(!attr->attnotnull);
|
||||
|
||||
attr->attnotnull = true;
|
||||
CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
|
||||
|
||||
@ -7853,7 +7868,8 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
|
||||
* If the nullness isn't already proven by validated constraints, have
|
||||
* ALTER TABLE phase 3 test for it.
|
||||
*/
|
||||
if (wqueue && !NotNullImpliedByRelConstraints(rel, attr))
|
||||
if (queue_validation && wqueue &&
|
||||
!NotNullImpliedByRelConstraints(rel, attr))
|
||||
{
|
||||
AlteredTableInfo *tab;
|
||||
|
||||
@ -7866,6 +7882,10 @@ set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
|
||||
table_close(attr_rel, RowExclusiveLock);
|
||||
heap_freetuple(tuple);
|
||||
}
|
||||
else
|
||||
{
|
||||
CacheInvalidateRelcache(rel);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -7951,6 +7971,15 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
|
||||
conForm->conislocal = true;
|
||||
changed = true;
|
||||
}
|
||||
else if (!conForm->convalidated)
|
||||
{
|
||||
/*
|
||||
* Flip attnotnull and convalidated, and also validate the
|
||||
* constraint.
|
||||
*/
|
||||
return ATExecValidateConstraint(wqueue, rel, NameStr(conForm->conname),
|
||||
recurse, recursing, lockmode);
|
||||
}
|
||||
|
||||
if (changed)
|
||||
{
|
||||
@ -8013,8 +8042,8 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
|
||||
InvokeObjectPostAlterHook(RelationRelationId,
|
||||
RelationGetRelid(rel), attnum);
|
||||
|
||||
/* Mark pg_attribute.attnotnull for the column */
|
||||
set_attnotnull(wqueue, rel, attnum, lockmode);
|
||||
/* Mark pg_attribute.attnotnull for the column and queue validation */
|
||||
set_attnotnull(wqueue, rel, attnum, true, true);
|
||||
|
||||
/*
|
||||
* Recurse to propagate the constraint to children that don't have one.
|
||||
@ -9417,7 +9446,6 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
|
||||
bool recurse, LOCKMODE lockmode,
|
||||
AlterTableUtilityContext *context)
|
||||
{
|
||||
ListCell *lc;
|
||||
Constraint *pkconstr;
|
||||
|
||||
pkconstr = castNode(Constraint, cmd->def);
|
||||
@ -9436,33 +9464,73 @@ ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
|
||||
lockmode);
|
||||
foreach_oid(childrelid, children)
|
||||
{
|
||||
foreach(lc, pkconstr->keys)
|
||||
foreach_node(String, attname, pkconstr->keys)
|
||||
{
|
||||
HeapTuple tup;
|
||||
Form_pg_attribute attrForm;
|
||||
char *attname = strVal(lfirst(lc));
|
||||
|
||||
tup = SearchSysCacheAttName(childrelid, attname);
|
||||
tup = SearchSysCacheAttName(childrelid, strVal(attname));
|
||||
if (!tup)
|
||||
elog(ERROR, "cache lookup failed for attribute %s of relation %u",
|
||||
attname, childrelid);
|
||||
strVal(attname), childrelid);
|
||||
attrForm = (Form_pg_attribute) GETSTRUCT(tup);
|
||||
if (!attrForm->attnotnull)
|
||||
ereport(ERROR,
|
||||
errmsg("column \"%s\" of table \"%s\" is not marked NOT NULL",
|
||||
attname, get_rel_name(childrelid)));
|
||||
strVal(attname), get_rel_name(childrelid)));
|
||||
ReleaseSysCache(tup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Insert not-null constraints in the queue for the PK columns */
|
||||
foreach(lc, pkconstr->keys)
|
||||
/* Verify that columns are not-null, or request that they be made so */
|
||||
foreach_node(String, column, pkconstr->keys)
|
||||
{
|
||||
AlterTableCmd *newcmd;
|
||||
Constraint *nnconstr;
|
||||
HeapTuple tuple;
|
||||
|
||||
nnconstr = makeNotNullConstraint(lfirst(lc));
|
||||
/*
|
||||
* First check if a suitable constraint exists. If it does, we don't
|
||||
* need to request another one. We do need to bail out if it's not
|
||||
* valid, though.
|
||||
*/
|
||||
tuple = findNotNullConstraint(RelationGetRelid(rel), strVal(column));
|
||||
if (tuple != NULL)
|
||||
{
|
||||
Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple);
|
||||
|
||||
/* a NO INHERIT constraint is no good */
|
||||
if (conForm->connoinherit)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot create primary key on column \"%s\"",
|
||||
strVal(column)),
|
||||
/*- translator: third %s is a constraint characteristic such as NOT VALID */
|
||||
errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.",
|
||||
NameStr(conForm->conname), strVal(column), "NO INHERIT"),
|
||||
errhint("You will need to make it inheritable using %s.",
|
||||
"ALTER TABLE ... ALTER CONSTRAINT ... INHERIT"));
|
||||
|
||||
/* an unvalidated constraint is no good */
|
||||
if (!conForm->convalidated)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot create primary key on column \"%s\"",
|
||||
strVal(column)),
|
||||
/*- translator: third %s is a constraint characteristic such as NOT VALID */
|
||||
errdetail("The constraint \"%s\" on column \"%s\", marked %s, is incompatible with a primary key.",
|
||||
NameStr(conForm->conname), strVal(column), "NOT VALID"),
|
||||
errhint("You will need to validate it using %s.",
|
||||
"ALTER TABLE ... VALIDATE CONSTRAINT"));
|
||||
|
||||
/* All good with this one; don't request another */
|
||||
heap_freetuple(tuple);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* This column is not already not-null, so add it to the queue */
|
||||
nnconstr = makeNotNullConstraint(column);
|
||||
|
||||
newcmd = makeNode(AlterTableCmd);
|
||||
newcmd->subtype = AT_AddConstraint;
|
||||
@ -9836,11 +9904,15 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
|
||||
constr->conname = ccon->name;
|
||||
|
||||
/*
|
||||
* If adding a not-null constraint, set the pg_attribute flag and tell
|
||||
* phase 3 to verify existing rows, if needed.
|
||||
* If adding a valid not-null constraint, set the pg_attribute flag
|
||||
* and tell phase 3 to verify existing rows, if needed. For an
|
||||
* invalid constraint, just set attnotnull, without queueing
|
||||
* verification.
|
||||
*/
|
||||
if (constr->contype == CONSTR_NOTNULL)
|
||||
set_attnotnull(wqueue, rel, ccon->attnum, lockmode);
|
||||
set_attnotnull(wqueue, rel, ccon->attnum,
|
||||
!constr->skip_validation,
|
||||
!constr->skip_validation);
|
||||
|
||||
ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
|
||||
}
|
||||
@ -12811,11 +12883,12 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
|
||||
|
||||
con = (Form_pg_constraint) GETSTRUCT(tuple);
|
||||
if (con->contype != CONSTRAINT_FOREIGN &&
|
||||
con->contype != CONSTRAINT_CHECK)
|
||||
con->contype != CONSTRAINT_CHECK &&
|
||||
con->contype != CONSTRAINT_NOTNULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
|
||||
constrName, RelationGetRelationName(rel))));
|
||||
errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key, check, or not-null constraint",
|
||||
constrName, RelationGetRelationName(rel)));
|
||||
|
||||
if (!con->conenforced)
|
||||
ereport(ERROR,
|
||||
@ -12833,6 +12906,11 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
|
||||
QueueCheckConstraintValidation(wqueue, conrel, rel, constrName,
|
||||
tuple, recurse, recursing, lockmode);
|
||||
}
|
||||
else if (con->contype == CONSTRAINT_NOTNULL)
|
||||
{
|
||||
QueueNNConstraintValidation(wqueue, conrel, rel,
|
||||
tuple, recurse, recursing, lockmode);
|
||||
}
|
||||
|
||||
ObjectAddressSet(address, ConstraintRelationId, con->oid);
|
||||
}
|
||||
@ -13049,6 +13127,109 @@ QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
|
||||
heap_freetuple(copyTuple);
|
||||
}
|
||||
|
||||
/*
|
||||
* QueueNNConstraintValidation
|
||||
*
|
||||
* Add an entry to the wqueue to validate the given not-null constraint in
|
||||
* Phase 3 and update the convalidated field in the pg_constraint catalog for
|
||||
* the specified relation and all its inheriting children.
|
||||
*/
|
||||
static void
|
||||
QueueNNConstraintValidation(List **wqueue, Relation conrel, Relation rel,
|
||||
HeapTuple contuple, bool recurse, bool recursing,
|
||||
LOCKMODE lockmode)
|
||||
{
|
||||
Form_pg_constraint con;
|
||||
AlteredTableInfo *tab;
|
||||
HeapTuple copyTuple;
|
||||
Form_pg_constraint copy_con;
|
||||
List *children = NIL;
|
||||
AttrNumber attnum;
|
||||
char *colname;
|
||||
|
||||
con = (Form_pg_constraint) GETSTRUCT(contuple);
|
||||
Assert(con->contype == CONSTRAINT_NOTNULL);
|
||||
|
||||
attnum = extractNotNullColumn(contuple);
|
||||
|
||||
/*
|
||||
* If we're recursing, we've already done this for parent, so skip it.
|
||||
* Also, if the constraint is a NO INHERIT constraint, we shouldn't try to
|
||||
* look for it in the children.
|
||||
*
|
||||
* We recurse before validating on the parent, to reduce risk of
|
||||
* deadlocks.
|
||||
*/
|
||||
if (!recursing && !con->connoinherit)
|
||||
children = find_all_inheritors(RelationGetRelid(rel), lockmode, NULL);
|
||||
|
||||
colname = get_attname(RelationGetRelid(rel), attnum, false);
|
||||
foreach_oid(childoid, children)
|
||||
{
|
||||
Relation childrel;
|
||||
HeapTuple contup;
|
||||
Form_pg_constraint childcon;
|
||||
char *conname;
|
||||
|
||||
if (childoid == RelationGetRelid(rel))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* If we are told not to recurse, there had better not be any child
|
||||
* tables, because we can't mark the constraint on the parent valid
|
||||
* unless it is valid for all child tables.
|
||||
*/
|
||||
if (!recurse)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("constraint must be validated on child tables too"));
|
||||
|
||||
/*
|
||||
* The column on child might have a different attnum, so search by
|
||||
* column name.
|
||||
*/
|
||||
contup = findNotNullConstraint(childoid, colname);
|
||||
if (!contup)
|
||||
elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation \"%s\"",
|
||||
colname, get_rel_name(childoid));
|
||||
childcon = (Form_pg_constraint) GETSTRUCT(contup);
|
||||
if (childcon->convalidated)
|
||||
continue;
|
||||
|
||||
/* find_all_inheritors already got lock */
|
||||
childrel = table_open(childoid, NoLock);
|
||||
conname = pstrdup(NameStr(childcon->conname));
|
||||
|
||||
/* XXX improve ATExecValidateConstraint API to avoid double search */
|
||||
ATExecValidateConstraint(wqueue, childrel, conname,
|
||||
false, true, lockmode);
|
||||
table_close(childrel, NoLock);
|
||||
}
|
||||
|
||||
/* Set attnotnull appropriately without queueing another validation */
|
||||
set_attnotnull(NULL, rel, attnum, true, false);
|
||||
|
||||
tab = ATGetQueueEntry(wqueue, rel);
|
||||
tab->verify_new_notnull = true;
|
||||
|
||||
/*
|
||||
* Invalidate relcache so that others see the new validated constraint.
|
||||
*/
|
||||
CacheInvalidateRelcache(rel);
|
||||
|
||||
/*
|
||||
* Now update the catalogs, while we have the door open.
|
||||
*/
|
||||
copyTuple = heap_copytuple(contuple);
|
||||
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
|
||||
copy_con->convalidated = true;
|
||||
CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple);
|
||||
|
||||
InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
|
||||
|
||||
heap_freetuple(copyTuple);
|
||||
}
|
||||
|
||||
/*
|
||||
* transformColumnNameList - transform list of column names
|
||||
*
|
||||
@ -19770,17 +19951,19 @@ PartConstraintImpliedByRelConstraint(Relation scanrel,
|
||||
|
||||
for (i = 1; i <= natts; i++)
|
||||
{
|
||||
Form_pg_attribute att = TupleDescAttr(scanrel->rd_att, i - 1);
|
||||
CompactAttribute *att = TupleDescCompactAttr(scanrel->rd_att, i - 1);
|
||||
|
||||
if (att->attnotnull && !att->attisdropped)
|
||||
/* invalid not-null constraint must be ignored here */
|
||||
if (att->attnullability == ATTNULLABLE_VALID && !att->attisdropped)
|
||||
{
|
||||
Form_pg_attribute wholeatt = TupleDescAttr(scanrel->rd_att, i - 1);
|
||||
NullTest *ntest = makeNode(NullTest);
|
||||
|
||||
ntest->arg = (Expr *) makeVar(1,
|
||||
i,
|
||||
att->atttypid,
|
||||
att->atttypmod,
|
||||
att->attcollation,
|
||||
wholeatt->atttypid,
|
||||
wholeatt->atttypmod,
|
||||
wholeatt->attcollation,
|
||||
0);
|
||||
ntest->nulltesttype = IS_NOT_NULL;
|
||||
|
||||
|
@ -123,7 +123,7 @@ slot_compile_deform(LLVMJitContext *context, TupleDesc desc,
|
||||
* combination of attisdropped && attnotnull combination shouldn't
|
||||
* exist.
|
||||
*/
|
||||
if (att->attnotnull &&
|
||||
if (att->attnullability == ATTNULLABLE_VALID &&
|
||||
!att->atthasmissing &&
|
||||
!att->attisdropped)
|
||||
guaranteed_column_number = attnum;
|
||||
@ -438,7 +438,7 @@ slot_compile_deform(LLVMJitContext *context, TupleDesc desc,
|
||||
* into account, because if they're present the heaptuple's natts
|
||||
* would have indicated that a slot_getmissingattrs() is needed.
|
||||
*/
|
||||
if (!att->attnotnull)
|
||||
if (att->attnullability != ATTNULLABLE_VALID)
|
||||
{
|
||||
LLVMBasicBlockRef b_ifnotnull;
|
||||
LLVMBasicBlockRef b_ifnull;
|
||||
@ -604,7 +604,8 @@ slot_compile_deform(LLVMJitContext *context, TupleDesc desc,
|
||||
known_alignment = -1;
|
||||
attguaranteedalign = false;
|
||||
}
|
||||
else if (att->attnotnull && attguaranteedalign && known_alignment >= 0)
|
||||
else if (att->attnullability == ATTNULLABLE_VALID &&
|
||||
attguaranteedalign && known_alignment >= 0)
|
||||
{
|
||||
/*
|
||||
* If the offset to the column was previously known, a NOT NULL &
|
||||
@ -614,7 +615,8 @@ slot_compile_deform(LLVMJitContext *context, TupleDesc desc,
|
||||
Assert(att->attlen > 0);
|
||||
known_alignment += att->attlen;
|
||||
}
|
||||
else if (att->attnotnull && (att->attlen % alignto) == 0)
|
||||
else if (att->attnullability == ATTNULLABLE_VALID &&
|
||||
(att->attlen % alignto) == 0)
|
||||
{
|
||||
/*
|
||||
* After a NOT NULL fixed-width column with a length that is a
|
||||
|
@ -177,7 +177,9 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
|
||||
{
|
||||
CompactAttribute *attr = TupleDescCompactAttr(relation->rd_att, i);
|
||||
|
||||
if (attr->attnotnull)
|
||||
Assert(attr->attnullability != ATTNULLABLE_UNKNOWN);
|
||||
|
||||
if (attr->attnullability == ATTNULLABLE_VALID)
|
||||
{
|
||||
rel->notnullattnums = bms_add_member(rel->notnullattnums,
|
||||
i + 1);
|
||||
@ -1251,6 +1253,7 @@ get_relation_data_width(Oid relid, int32 *attr_widths)
|
||||
* get_relation_constraints
|
||||
*
|
||||
* Retrieve the applicable constraint expressions of the given relation.
|
||||
* Only constraints that have been validated are considered.
|
||||
*
|
||||
* Returns a List (possibly empty) of constraint expressions. Each one
|
||||
* has been canonicalized, and its Vars are changed to have the varno
|
||||
@ -1351,17 +1354,18 @@ get_relation_constraints(PlannerInfo *root,
|
||||
|
||||
for (i = 1; i <= natts; i++)
|
||||
{
|
||||
Form_pg_attribute att = TupleDescAttr(relation->rd_att, i - 1);
|
||||
CompactAttribute *att = TupleDescCompactAttr(relation->rd_att, i - 1);
|
||||
|
||||
if (att->attnotnull && !att->attisdropped)
|
||||
if (att->attnullability == ATTNULLABLE_VALID && !att->attisdropped)
|
||||
{
|
||||
Form_pg_attribute wholeatt = TupleDescAttr(relation->rd_att, i - 1);
|
||||
NullTest *ntest = makeNode(NullTest);
|
||||
|
||||
ntest->arg = (Expr *) makeVar(varno,
|
||||
i,
|
||||
att->atttypid,
|
||||
att->atttypmod,
|
||||
att->attcollation,
|
||||
wholeatt->atttypid,
|
||||
wholeatt->atttypmod,
|
||||
wholeatt->attcollation,
|
||||
0);
|
||||
ntest->nulltesttype = IS_NOT_NULL;
|
||||
|
||||
|
@ -4217,11 +4217,10 @@ ConstraintElem:
|
||||
n->contype = CONSTR_NOTNULL;
|
||||
n->location = @1;
|
||||
n->keys = list_make1(makeString($3));
|
||||
/* no NOT VALID support yet */
|
||||
processCASbits($4, @4, "NOT NULL",
|
||||
NULL, NULL, NULL, NULL,
|
||||
NULL, NULL, NULL, &n->skip_validation,
|
||||
&n->is_no_inherit, yyscanner);
|
||||
n->initially_valid = true;
|
||||
n->initially_valid = !n->skip_validation;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
| UNIQUE opt_unique_null_treatment '(' columnList opt_without_overlaps ')' opt_c_include opt_definition OptConsTableSpace
|
||||
|
73
src/backend/utils/cache/relcache.c
vendored
73
src/backend/utils/cache/relcache.c
vendored
@ -307,7 +307,7 @@ static TupleDesc GetPgClassDescriptor(void);
|
||||
static TupleDesc GetPgIndexDescriptor(void);
|
||||
static void AttrDefaultFetch(Relation relation, int ndef);
|
||||
static int AttrDefaultCmp(const void *a, const void *b);
|
||||
static void CheckConstraintFetch(Relation relation);
|
||||
static void CheckNNConstraintFetch(Relation relation);
|
||||
static int CheckConstraintCmp(const void *a, const void *b);
|
||||
static void InitIndexAmRoutine(Relation relation);
|
||||
static void IndexSupportInitialize(oidvector *indclass,
|
||||
@ -684,6 +684,8 @@ RelationBuildTupleDesc(Relation relation)
|
||||
attrmiss ||
|
||||
relation->rd_rel->relchecks > 0)
|
||||
{
|
||||
bool is_catalog = IsCatalogRelation(relation);
|
||||
|
||||
relation->rd_att->constr = constr;
|
||||
|
||||
if (ndef > 0) /* DEFAULTs */
|
||||
@ -693,9 +695,33 @@ RelationBuildTupleDesc(Relation relation)
|
||||
|
||||
constr->missing = attrmiss;
|
||||
|
||||
if (relation->rd_rel->relchecks > 0) /* CHECKs */
|
||||
CheckConstraintFetch(relation);
|
||||
else
|
||||
/* CHECK and NOT NULLs */
|
||||
if (relation->rd_rel->relchecks > 0 ||
|
||||
(!is_catalog && constr->has_not_null))
|
||||
CheckNNConstraintFetch(relation);
|
||||
|
||||
/*
|
||||
* Any not-null constraint that wasn't marked invalid by
|
||||
* CheckNNConstraintFetch must necessarily be valid; make it so in the
|
||||
* CompactAttribute array.
|
||||
*/
|
||||
if (!is_catalog)
|
||||
{
|
||||
for (int i = 0; i < relation->rd_rel->relnatts; i++)
|
||||
{
|
||||
CompactAttribute *attr;
|
||||
|
||||
attr = TupleDescCompactAttr(relation->rd_att, i);
|
||||
|
||||
if (attr->attnullability == ATTNULLABLE_UNKNOWN)
|
||||
attr->attnullability = ATTNULLABLE_VALID;
|
||||
else
|
||||
Assert(attr->attnullability == ATTNULLABLE_INVALID ||
|
||||
attr->attnullability == ATTNULLABLE_UNRESTRICTED);
|
||||
}
|
||||
}
|
||||
|
||||
if (relation->rd_rel->relchecks == 0)
|
||||
constr->num_check = 0;
|
||||
}
|
||||
else
|
||||
@ -3575,6 +3601,14 @@ RelationBuildLocalRelation(const char *relname,
|
||||
datt->attnotnull = satt->attnotnull;
|
||||
has_not_null |= satt->attnotnull;
|
||||
populate_compact_attribute(rel->rd_att, i);
|
||||
|
||||
if (satt->attnotnull)
|
||||
{
|
||||
CompactAttribute *scatt = TupleDescCompactAttr(tupDesc, i);
|
||||
CompactAttribute *dcatt = TupleDescCompactAttr(rel->rd_att, i);
|
||||
|
||||
dcatt->attnullability = scatt->attnullability;
|
||||
}
|
||||
}
|
||||
|
||||
if (has_not_null)
|
||||
@ -4533,13 +4567,14 @@ AttrDefaultCmp(const void *a, const void *b)
|
||||
}
|
||||
|
||||
/*
|
||||
* Load any check constraints for the relation.
|
||||
* Load any check constraints for the relation, and update not-null validity
|
||||
* of invalid constraints.
|
||||
*
|
||||
* As with defaults, if we don't find the expected number of them, just warn
|
||||
* here. The executor should throw an error if an INSERT/UPDATE is attempted.
|
||||
*/
|
||||
static void
|
||||
CheckConstraintFetch(Relation relation)
|
||||
CheckNNConstraintFetch(Relation relation)
|
||||
{
|
||||
ConstrCheck *check;
|
||||
int ncheck = relation->rd_rel->relchecks;
|
||||
@ -4570,7 +4605,31 @@ CheckConstraintFetch(Relation relation)
|
||||
Datum val;
|
||||
bool isnull;
|
||||
|
||||
/* We want check constraints only */
|
||||
/*
|
||||
* If this is a not-null constraint, then only look at it if it's
|
||||
* invalid, and if so, mark the TupleDesc entry as known invalid.
|
||||
* Otherwise move on. We'll mark any remaining columns that are still
|
||||
* in UNKNOWN state as known valid later. This allows us not to have
|
||||
* to extract the attnum from this constraint tuple in the vast
|
||||
* majority of cases.
|
||||
*/
|
||||
if (conform->contype == CONSTRAINT_NOTNULL)
|
||||
{
|
||||
if (!conform->convalidated)
|
||||
{
|
||||
AttrNumber attnum;
|
||||
|
||||
attnum = extractNotNullColumn(htup);
|
||||
Assert(relation->rd_att->compact_attrs[attnum - 1].attnullability ==
|
||||
ATTNULLABLE_UNKNOWN);
|
||||
relation->rd_att->compact_attrs[attnum - 1].attnullability =
|
||||
ATTNULLABLE_INVALID;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* For what follows, consider check constraints only */
|
||||
if (conform->contype != CONSTRAINT_CHECK)
|
||||
continue;
|
||||
|
||||
|
Reference in New Issue
Block a user