1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-29 22:49:41 +03:00

Support domains over composite types.

This is the last major omission in our domains feature: you can now
make a domain over anything that's not a pseudotype.

The major complication from an implementation standpoint is that places
that might be creating tuples of a domain type now need to be prepared
to apply domain_check().  It seems better that unprepared code fail
with an error like "<type> is not composite" than that it silently fail
to apply domain constraints.  Therefore, relevant infrastructure like
get_func_result_type() and lookup_rowtype_tupdesc() has been adjusted
to treat domain-over-composite as a distinct case that unprepared code
won't recognize, rather than just transparently treating it the same
as plain composite.  This isn't a 100% solution to the possibility of
overlooked domain checks, but it catches most places.

In passing, improve typcache.c's support for domains (it can now cache
the identity of a domain's base type), and rewrite the argument handling
logic in jsonfuncs.c's populate_record[set]_worker to reduce duplicative
per-call lookups.

I believe this is code-complete so far as the core and contrib code go.
The PLs need varying amounts of work, which will be tackled in followup
patches.

Discussion: https://postgr.es/m/4206.1499798337@sss.pgh.pa.us
This commit is contained in:
Tom Lane
2017-10-26 13:47:45 -04:00
parent 08f1e1f0a4
commit 37a795a60b
37 changed files with 1085 additions and 293 deletions

View File

@@ -499,9 +499,26 @@ coerce_type(ParseState *pstate, Node *node,
* Input class type is a subclass of target, so generate an
* appropriate runtime conversion (removing unneeded columns and
* possibly rearranging the ones that are wanted).
*
* We will also get here when the input is a domain over a subclass of
* the target type. To keep life simple for the executor, we define
* ConvertRowtypeExpr as only working between regular composite types;
* therefore, in such cases insert a RelabelType to smash the input
* expression down to its base type.
*/
Oid baseTypeId = getBaseType(inputTypeId);
ConvertRowtypeExpr *r = makeNode(ConvertRowtypeExpr);
if (baseTypeId != inputTypeId)
{
RelabelType *rt = makeRelabelType((Expr *) node,
baseTypeId, -1,
InvalidOid,
COERCE_IMPLICIT_CAST);
rt->location = location;
node = (Node *) rt;
}
r->arg = (Expr *) node;
r->resulttype = targetTypeId;
r->convertformat = cformat;
@@ -966,6 +983,8 @@ coerce_record_to_complex(ParseState *pstate, Node *node,
int location)
{
RowExpr *rowexpr;
Oid baseTypeId;
int32 baseTypeMod = -1;
TupleDesc tupdesc;
List *args = NIL;
List *newargs;
@@ -1001,7 +1020,14 @@ coerce_record_to_complex(ParseState *pstate, Node *node,
format_type_be(targetTypeId)),
parser_coercion_errposition(pstate, location, node)));
tupdesc = lookup_rowtype_tupdesc(targetTypeId, -1);
/*
* Look up the composite type, accounting for possibility that what we are
* given is a domain over composite.
*/
baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
tupdesc = lookup_rowtype_tupdesc(baseTypeId, baseTypeMod);
/* Process the fields */
newargs = NIL;
ucolno = 1;
arg = list_head(args);
@@ -1070,10 +1096,22 @@ coerce_record_to_complex(ParseState *pstate, Node *node,
rowexpr = makeNode(RowExpr);
rowexpr->args = newargs;
rowexpr->row_typeid = targetTypeId;
rowexpr->row_typeid = baseTypeId;
rowexpr->row_format = cformat;
rowexpr->colnames = NIL; /* not needed for named target type */
rowexpr->location = location;
/* If target is a domain, apply constraints */
if (baseTypeId != targetTypeId)
{
rowexpr->row_format = COERCE_IMPLICIT_CAST;
return coerce_to_domain((Node *) rowexpr,
baseTypeId, baseTypeMod,
targetTypeId,
ccontext, cformat, location,
false);
}
return (Node *) rowexpr;
}
@@ -2401,13 +2439,13 @@ is_complex_array(Oid typid)
/*
* Check whether reltypeId is the row type of a typed table of type
* reloftypeId. (This is conceptually similar to the subtype
* relationship checked by typeInheritsFrom().)
* reloftypeId, or is a domain over such a row type. (This is conceptually
* similar to the subtype relationship checked by typeInheritsFrom().)
*/
static bool
typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
{
Oid relid = typeidTypeRelid(reltypeId);
Oid relid = typeOrDomainTypeRelid(reltypeId);
bool result = false;
if (relid)

View File

@@ -1819,18 +1819,19 @@ ParseComplexProjection(ParseState *pstate, char *funcname, Node *first_arg,
}
/*
* Else do it the hard way with get_expr_result_type().
* Else do it the hard way with get_expr_result_tupdesc().
*
* If it's a Var of type RECORD, we have to work even harder: we have to
* find what the Var refers to, and pass that to get_expr_result_type.
* find what the Var refers to, and pass that to get_expr_result_tupdesc.
* That task is handled by expandRecordVariable().
*/
if (IsA(first_arg, Var) &&
((Var *) first_arg)->vartype == RECORDOID)
tupdesc = expandRecordVariable(pstate, (Var *) first_arg, 0);
else if (get_expr_result_type(first_arg, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
else
tupdesc = get_expr_result_tupdesc(first_arg, true);
if (!tupdesc)
return NULL; /* unresolvable RECORD type */
Assert(tupdesc);
for (i = 0; i < tupdesc->natts; i++)
{

View File

@@ -1496,7 +1496,8 @@ addRangeTableEntryForFunction(ParseState *pstate,
parser_errposition(pstate, exprLocation(funcexpr))));
}
if (functypclass == TYPEFUNC_COMPOSITE)
if (functypclass == TYPEFUNC_COMPOSITE ||
functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
{
/* Composite data type, e.g. a table's row type */
Assert(tupdesc);
@@ -2245,7 +2246,8 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
functypclass = get_expr_result_type(rtfunc->funcexpr,
&funcrettype,
&tupdesc);
if (functypclass == TYPEFUNC_COMPOSITE)
if (functypclass == TYPEFUNC_COMPOSITE ||
functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
{
/* Composite data type, e.g. a table's row type */
Assert(tupdesc);
@@ -2765,7 +2767,8 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
&funcrettype,
&tupdesc);
if (functypclass == TYPEFUNC_COMPOSITE)
if (functypclass == TYPEFUNC_COMPOSITE ||
functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
{
/* Composite data type, e.g. a table's row type */
Form_pg_attribute att_tup;
@@ -2966,14 +2969,11 @@ get_rte_attribute_is_dropped(RangeTblEntry *rte, AttrNumber attnum)
if (attnum > atts_done &&
attnum <= atts_done + rtfunc->funccolcount)
{
TypeFuncClass functypclass;
Oid funcrettype;
TupleDesc tupdesc;
functypclass = get_expr_result_type(rtfunc->funcexpr,
&funcrettype,
&tupdesc);
if (functypclass == TYPEFUNC_COMPOSITE)
tupdesc = get_expr_result_tupdesc(rtfunc->funcexpr,
true);
if (tupdesc)
{
/* Composite data type, e.g. a table's row type */
Form_pg_attribute att_tup;

View File

@@ -725,6 +725,8 @@ transformAssignmentIndirection(ParseState *pstate,
else
{
FieldStore *fstore;
Oid baseTypeId;
int32 baseTypeMod;
Oid typrelid;
AttrNumber attnum;
Oid fieldTypeId;
@@ -752,7 +754,14 @@ transformAssignmentIndirection(ParseState *pstate,
/* No subscripts, so can process field selection here */
typrelid = typeidTypeRelid(targetTypeId);
/*
* Look up the composite type, accounting for possibility that
* what we are given is a domain over composite.
*/
baseTypeMod = targetTypMod;
baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
typrelid = typeidTypeRelid(baseTypeId);
if (!typrelid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
@@ -796,7 +805,17 @@ transformAssignmentIndirection(ParseState *pstate,
fstore->arg = (Expr *) basenode;
fstore->newvals = list_make1(rhs);
fstore->fieldnums = list_make1_int(attnum);
fstore->resulttype = targetTypeId;
fstore->resulttype = baseTypeId;
/* If target is a domain, apply constraints */
if (baseTypeId != targetTypeId)
return coerce_to_domain((Node *) fstore,
baseTypeId, baseTypeMod,
targetTypeId,
COERCION_IMPLICIT,
COERCE_IMPLICIT_CAST,
location,
false);
return (Node *) fstore;
}
@@ -1164,7 +1183,7 @@ ExpandColumnRefStar(ParseState *pstate, ColumnRef *cref,
Node *node;
node = pstate->p_post_columnref_hook(pstate, cref,
(Node *) rte);
(Node *) rte);
if (node != NULL)
{
if (rte != NULL)
@@ -1387,22 +1406,18 @@ ExpandRowReference(ParseState *pstate, Node *expr,
* (This can be pretty inefficient if the expression involves nontrivial
* computation :-(.)
*
* Verify it's a composite type, and get the tupdesc. We use
* get_expr_result_type() because that can handle references to functions
* returning anonymous record types. If that fails, use
* lookup_rowtype_tupdesc(), which will almost certainly fail as well, but
* it will give an appropriate error message.
* Verify it's a composite type, and get the tupdesc.
* get_expr_result_tupdesc() handles this conveniently.
*
* If it's a Var of type RECORD, we have to work even harder: we have to
* find what the Var refers to, and pass that to get_expr_result_type.
* find what the Var refers to, and pass that to get_expr_result_tupdesc.
* That task is handled by expandRecordVariable().
*/
if (IsA(expr, Var) &&
((Var *) expr)->vartype == RECORDOID)
tupleDesc = expandRecordVariable(pstate, (Var *) expr, 0);
else if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
exprTypmod(expr));
else
tupleDesc = get_expr_result_tupdesc(expr, false);
Assert(tupleDesc);
/* Generate a list of references to the individual fields */
@@ -1610,15 +1625,9 @@ expandRecordVariable(ParseState *pstate, Var *var, int levelsup)
/*
* We now have an expression we can't expand any more, so see if
* get_expr_result_type() can do anything with it. If not, pass to
* lookup_rowtype_tupdesc() which will probably fail, but will give an
* appropriate error message while failing.
* get_expr_result_tupdesc() can do anything with it.
*/
if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
exprTypmod(expr));
return tupleDesc;
return get_expr_result_tupdesc(expr, false);
}

View File

@@ -641,7 +641,10 @@ stringTypeDatum(Type tp, char *string, int32 atttypmod)
return OidInputFunctionCall(typinput, string, typioparam, atttypmod);
}
/* given a typeid, return the type's typrelid (associated relation, if any) */
/*
* Given a typeid, return the type's typrelid (associated relation), if any.
* Returns InvalidOid if type is not a composite type.
*/
Oid
typeidTypeRelid(Oid type_id)
{
@@ -652,13 +655,44 @@ typeidTypeRelid(Oid type_id)
typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
if (!HeapTupleIsValid(typeTuple))
elog(ERROR, "cache lookup failed for type %u", type_id);
type = (Form_pg_type) GETSTRUCT(typeTuple);
result = type->typrelid;
ReleaseSysCache(typeTuple);
return result;
}
/*
* Given a typeid, return the type's typrelid (associated relation), if any.
* Returns InvalidOid if type is not a composite type or a domain over one.
* This is the same as typeidTypeRelid(getBaseType(type_id)), but faster.
*/
Oid
typeOrDomainTypeRelid(Oid type_id)
{
HeapTuple typeTuple;
Form_pg_type type;
Oid result;
for (;;)
{
typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
if (!HeapTupleIsValid(typeTuple))
elog(ERROR, "cache lookup failed for type %u", type_id);
type = (Form_pg_type) GETSTRUCT(typeTuple);
if (type->typtype != TYPTYPE_DOMAIN)
{
/* Not a domain, so done looking through domains */
break;
}
/* It is a domain, so examine the base type instead */
type_id = type->typbasetype;
ReleaseSysCache(typeTuple);
}
result = type->typrelid;
ReleaseSysCache(typeTuple);
return result;
}
/*
* error context callback for parse failure during parseTypeString()
*/