1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-27 00:12:01 +03:00

Catalog domain not-null constraints

This applies the explicit catalog representation of not-null
constraints introduced by b0e96f3119 for table constraints also to
domain not-null constraints.

Reviewed-by: Aleksander Alekseev <aleksander@timescale.com>
Reviewed-by: jian he <jian.universality@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/9ec24d7b-633d-463a-84c6-7acff769c9e8%40eisentraut.org
This commit is contained in:
Peter Eisentraut
2024-03-20 09:29:08 +01:00
parent c9c260decd
commit e5da0fe3c2
10 changed files with 386 additions and 115 deletions

View File

@@ -126,15 +126,19 @@ static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid);
static Oid findRangeSubOpclass(List *opcname, Oid subtype);
static Oid findRangeCanonicalFunction(List *procname, Oid typeOid);
static Oid findRangeSubtypeDiffFunction(List *procname, Oid subtype);
static void validateDomainConstraint(Oid domainoid, char *ccbin);
static void validateDomainCheckConstraint(Oid domainoid, const char *ccbin);
static void validateDomainNotNullConstraint(Oid domainoid);
static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode);
static void checkEnumOwner(HeapTuple tup);
static char *domainAddConstraint(Oid domainOid, Oid domainNamespace,
Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr);
static char *domainAddCheckConstraint(Oid domainOid, Oid domainNamespace,
Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr);
static Node *replace_domain_constraint_value(ParseState *pstate,
ColumnRef *cref);
static void domainAddNotNullConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr);
static void AlterTypeRecurse(Oid typeOid, bool isImplicitArray,
HeapTuple tup, Relation catalog,
AlterTypeRecurseParams *atparams);
@@ -1105,9 +1109,15 @@ DefineDomain(CreateDomainStmt *stmt)
switch (constr->contype)
{
case CONSTR_CHECK:
domainAddConstraint(address.objectId, domainNamespace,
basetypeoid, basetypeMod,
constr, domainName, NULL);
domainAddCheckConstraint(address.objectId, domainNamespace,
basetypeoid, basetypeMod,
constr, domainName, NULL);
break;
case CONSTR_NOTNULL:
domainAddNotNullConstraint(address.objectId, domainNamespace,
basetypeoid, basetypeMod,
constr, domainName, NULL);
break;
/* Other constraint types were fully processed above */
@@ -2723,66 +2733,32 @@ AlterDomainNotNull(List *names, bool notNull)
return address;
}
/* Adding a NOT NULL constraint requires checking existing columns */
if (notNull)
{
List *rels;
ListCell *rt;
Constraint *constr;
/* Fetch relation list with attributes based on this domain */
/* ShareLock is sufficient to prevent concurrent data changes */
constr = makeNode(Constraint);
constr->contype = CONSTR_NOTNULL;
constr->initially_valid = true;
constr->location = -1;
rels = get_rels_with_domain(domainoid, ShareLock);
domainAddNotNullConstraint(domainoid, typTup->typnamespace,
typTup->typbasetype, typTup->typtypmod,
constr, NameStr(typTup->typname), NULL);
foreach(rt, rels)
{
RelToCheck *rtc = (RelToCheck *) lfirst(rt);
Relation testrel = rtc->rel;
TupleDesc tupdesc = RelationGetDescr(testrel);
TupleTableSlot *slot;
TableScanDesc scan;
Snapshot snapshot;
validateDomainNotNullConstraint(domainoid);
}
else
{
HeapTuple conTup;
ObjectAddress conobj;
/* Scan all tuples in this relation */
snapshot = RegisterSnapshot(GetLatestSnapshot());
scan = table_beginscan(testrel, snapshot, 0, NULL);
slot = table_slot_create(testrel, NULL);
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
int i;
conTup = findDomainNotNullConstraint(domainoid);
if (conTup == NULL)
elog(ERROR, "could not find not-null constraint on domain \"%s\"", NameStr(typTup->typname));
/* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
int attnum = rtc->atts[i];
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
if (slot_attisnull(slot, attnum))
{
/*
* In principle the auxiliary information for this
* error should be errdatatype(), but errtablecol()
* seems considerably more useful in practice. Since
* this code only executes in an ALTER DOMAIN command,
* the client should already know which domain is in
* question.
*/
ereport(ERROR,
(errcode(ERRCODE_NOT_NULL_VIOLATION),
errmsg("column \"%s\" of table \"%s\" contains null values",
NameStr(attr->attname),
RelationGetRelationName(testrel)),
errtablecol(testrel, attnum)));
}
}
}
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
UnregisterSnapshot(snapshot);
/* Close each rel after processing, but keep lock */
table_close(testrel, NoLock);
}
ObjectAddressSet(conobj, ConstraintRelationId, ((Form_pg_constraint) GETSTRUCT(conTup))->oid);
performDeletion(&conobj, DROP_RESTRICT, 0);
}
/*
@@ -2863,10 +2839,17 @@ AlterDomainDropConstraint(List *names, const char *constrName,
/* There can be at most one matching row */
if ((contup = systable_getnext(conscan)) != NULL)
{
Form_pg_constraint construct = (Form_pg_constraint) GETSTRUCT(contup);
ObjectAddress conobj;
if (construct->contype == CONSTRAINT_NOTNULL)
{
((Form_pg_type) GETSTRUCT(tup))->typnotnull = false;
CatalogTupleUpdate(rel, &tup->t_self, tup);
}
conobj.classId = ConstraintRelationId;
conobj.objectId = ((Form_pg_constraint) GETSTRUCT(contup))->oid;
conobj.objectId = construct->oid;
conobj.objectSubId = 0;
performDeletion(&conobj, behavior, 0);
@@ -2921,7 +2904,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint,
Form_pg_type typTup;
Constraint *constr;
char *ccbin;
ObjectAddress address;
ObjectAddress address = InvalidObjectAddress;
/* Make a TypeName so we can use standard type lookup machinery */
typename = makeTypeNameFromNameList(names);
@@ -2947,6 +2930,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint,
switch (constr->contype)
{
case CONSTR_CHECK:
case CONSTR_NOTNULL:
/* processed below */
break;
@@ -2989,29 +2973,52 @@ AlterDomainAddConstraint(List *names, Node *newConstraint,
break;
}
/*
* Since all other constraint types throw errors, this must be a check
* constraint. First, process the constraint expression and add an entry
* to pg_constraint.
*/
if (constr->contype == CONSTR_CHECK)
{
/*
* First, process the constraint expression and add an entry to
* pg_constraint.
*/
ccbin = domainAddConstraint(domainoid, typTup->typnamespace,
typTup->typbasetype, typTup->typtypmod,
constr, NameStr(typTup->typname), constrAddr);
ccbin = domainAddCheckConstraint(domainoid, typTup->typnamespace,
typTup->typbasetype, typTup->typtypmod,
constr, NameStr(typTup->typname), constrAddr);
/*
* If requested to validate the constraint, test all values stored in the
* attributes based on the domain the constraint is being added to.
*/
if (!constr->skip_validation)
validateDomainConstraint(domainoid, ccbin);
/*
* We must send out an sinval message for the domain, to ensure that any
* dependent plans get rebuilt. Since this command doesn't change the
* domain's pg_type row, that won't happen automatically; do it manually.
*/
CacheInvalidateHeapTuple(typrel, tup, NULL);
/*
* If requested to validate the constraint, test all values stored in
* the attributes based on the domain the constraint is being added
* to.
*/
if (!constr->skip_validation)
validateDomainCheckConstraint(domainoid, ccbin);
/*
* We must send out an sinval message for the domain, to ensure that
* any dependent plans get rebuilt. Since this command doesn't change
* the domain's pg_type row, that won't happen automatically; do it
* manually.
*/
CacheInvalidateHeapTuple(typrel, tup, NULL);
}
else if (constr->contype == CONSTR_NOTNULL)
{
/* Is the domain already set NOT NULL? */
if (typTup->typnotnull)
{
table_close(typrel, RowExclusiveLock);
return address;
}
domainAddNotNullConstraint(domainoid, typTup->typnamespace,
typTup->typbasetype, typTup->typtypmod,
constr, NameStr(typTup->typname), constrAddr);
if (!constr->skip_validation)
validateDomainNotNullConstraint(domainoid);
typTup->typnotnull = true;
CatalogTupleUpdate(typrel, &tup->t_self, tup);
}
ObjectAddressSet(address, TypeRelationId, domainoid);
@@ -3096,7 +3103,7 @@ AlterDomainValidateConstraint(List *names, const char *constrName)
val = SysCacheGetAttrNotNull(CONSTROID, tuple, Anum_pg_constraint_conbin);
conbin = TextDatumGetCString(val);
validateDomainConstraint(domainoid, conbin);
validateDomainCheckConstraint(domainoid, conbin);
/*
* Now update the catalog, while we have the door open.
@@ -3122,8 +3129,76 @@ AlterDomainValidateConstraint(List *names, const char *constrName)
return address;
}
/*
* Verify that all columns currently using the domain are not null.
*/
static void
validateDomainConstraint(Oid domainoid, char *ccbin)
validateDomainNotNullConstraint(Oid domainoid)
{
List *rels;
ListCell *rt;
/* Fetch relation list with attributes based on this domain */
/* ShareLock is sufficient to prevent concurrent data changes */
rels = get_rels_with_domain(domainoid, ShareLock);
foreach(rt, rels)
{
RelToCheck *rtc = (RelToCheck *) lfirst(rt);
Relation testrel = rtc->rel;
TupleDesc tupdesc = RelationGetDescr(testrel);
TupleTableSlot *slot;
TableScanDesc scan;
Snapshot snapshot;
/* Scan all tuples in this relation */
snapshot = RegisterSnapshot(GetLatestSnapshot());
scan = table_beginscan(testrel, snapshot, 0, NULL);
slot = table_slot_create(testrel, NULL);
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
int i;
/* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
int attnum = rtc->atts[i];
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
if (slot_attisnull(slot, attnum))
{
/*
* In principle the auxiliary information for this error
* should be errdatatype(), but errtablecol() seems
* considerably more useful in practice. Since this code
* only executes in an ALTER DOMAIN command, the client
* should already know which domain is in question.
*/
ereport(ERROR,
(errcode(ERRCODE_NOT_NULL_VIOLATION),
errmsg("column \"%s\" of table \"%s\" contains null values",
NameStr(attr->attname),
RelationGetRelationName(testrel)),
errtablecol(testrel, attnum)));
}
}
}
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
UnregisterSnapshot(snapshot);
/* Close each rel after processing, but keep lock */
table_close(testrel, NoLock);
}
}
/*
* Verify that all columns currently using the domain satisfy the given check
* constraint expression.
*/
static void
validateDomainCheckConstraint(Oid domainoid, const char *ccbin)
{
Expr *expr = (Expr *) stringToNode(ccbin);
List *rels;
@@ -3429,12 +3504,12 @@ checkDomainOwner(HeapTuple tup)
}
/*
* domainAddConstraint - code shared between CREATE and ALTER DOMAIN
* domainAddCheckConstraint - code shared between CREATE and ALTER DOMAIN
*/
static char *
domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr)
domainAddCheckConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr)
{
Node *expr;
char *ccbin;
@@ -3442,6 +3517,8 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
CoerceToDomainValue *domVal;
Oid ccoid;
Assert(constr->contype == CONSTR_CHECK);
/*
* Assign or validate constraint name
*/
@@ -3562,9 +3639,10 @@ replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref)
{
/*
* Check for a reference to "value", and if that's what it is, replace
* with a CoerceToDomainValue as prepared for us by domainAddConstraint.
* (We handle VALUE as a name, not a keyword, to avoid breaking a lot of
* applications that have used VALUE as a column name in the past.)
* with a CoerceToDomainValue as prepared for us by
* domainAddCheckConstraint. (We handle VALUE as a name, not a keyword, to
* avoid breaking a lot of applications that have used VALUE as a column
* name in the past.)
*/
if (list_length(cref->fields) == 1)
{
@@ -3584,6 +3662,79 @@ replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref)
return NULL;
}
/*
* domainAddNotNullConstraint - code shared between CREATE and ALTER DOMAIN
*/
static void
domainAddNotNullConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
int typMod, Constraint *constr,
const char *domainName, ObjectAddress *constrAddr)
{
Oid ccoid;
Assert(constr->contype == CONSTR_NOTNULL);
/*
* Assign or validate constraint name
*/
if (constr->conname)
{
if (ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
domainOid,
constr->conname))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("constraint \"%s\" for domain \"%s\" already exists",
constr->conname, domainName)));
}
else
constr->conname = ChooseConstraintName(domainName,
NULL,
"not_null",
domainNamespace,
NIL);
/*
* Store the constraint in pg_constraint
*/
ccoid =
CreateConstraintEntry(constr->conname, /* Constraint Name */
domainNamespace, /* namespace */
CONSTRAINT_NOTNULL, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
!constr->skip_validation, /* Is Validated */
InvalidOid, /* no parent constraint */
InvalidOid, /* not a relation constraint */
NULL,
0,
0,
domainOid, /* domain constraint */
InvalidOid, /* no associated index */
InvalidOid, /* Foreign key fields */
NULL,
NULL,
NULL,
NULL,
0,
' ',
' ',
NULL,
0,
' ',
NULL, /* not an exclusion constraint */
NULL,
NULL,
true, /* is local */
0, /* inhcount */
false, /* connoinherit */
false, /* conperiod */
false); /* is_internal */
if (constrAddr)
ObjectAddressSet(*constrAddr, ConstraintRelationId, ccoid);
}
/*
* Execute ALTER TYPE RENAME