1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-18 04:29:09 +03:00

Per-column collation support

This adds collation support for columns and domains, a COLLATE clause
to override it per expression, and B-tree index support.

Peter Eisentraut
reviewed by Pavel Stehule, Itagaki Takahiro, Robert Haas, Noah Misch
This commit is contained in:
Peter Eisentraut
2011-02-08 23:04:18 +02:00
parent 1703f0e8da
commit 414c5a2ea6
156 changed files with 4519 additions and 582 deletions

View File

@@ -1203,6 +1203,7 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
ListCell *left_tlist,
*lct,
*lcm,
*lcc,
*l;
List *targetvars,
*targetnames,
@@ -1296,10 +1297,11 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
targetnames = NIL;
left_tlist = list_head(leftmostQuery->targetList);
forboth(lct, sostmt->colTypes, lcm, sostmt->colTypmods)
forthree(lct, sostmt->colTypes, lcm, sostmt->colTypmods, lcc, sostmt->colCollations)
{
Oid colType = lfirst_oid(lct);
int32 colTypmod = lfirst_int(lcm);
Oid colCollation = lfirst_oid(lcc);
TargetEntry *lefttle = (TargetEntry *) lfirst(left_tlist);
char *colName;
TargetEntry *tle;
@@ -1311,6 +1313,7 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
lefttle->resno,
colType,
colTypmod,
colCollation,
0);
var->location = exprLocation((Node *) lefttle->expr);
tle = makeTargetEntry((Expr *) var,
@@ -1418,7 +1421,7 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
* Recursively transform leaves and internal nodes of a set-op tree
*
* In addition to returning the transformed node, we return a list of
* expression nodes showing the type, typmod, and location (for error messages)
* expression nodes showing the type, typmod, collation, and location (for error messages)
* of each output column of the set-op node. This is used only during the
* internal recursion of this function. At the upper levels we use
* SetToDefault nodes for this purpose, since they carry exactly the fields
@@ -1591,6 +1594,7 @@ transformSetOperationTree(ParseState *pstate, SelectStmt *stmt,
*colInfo = NIL;
op->colTypes = NIL;
op->colTypmods = NIL;
op->colCollations = NIL;
op->groupClauses = NIL;
forboth(lci, lcolinfo, rci, rcolinfo)
{
@@ -1604,6 +1608,7 @@ transformSetOperationTree(ParseState *pstate, SelectStmt *stmt,
SetToDefault *rescolnode;
Oid rescoltype;
int32 rescoltypmod;
Oid rescolcoll;
/* select common type, same as CASE et al */
rescoltype = select_common_type(pstate,
@@ -1615,6 +1620,12 @@ transformSetOperationTree(ParseState *pstate, SelectStmt *stmt,
rescoltypmod = lcoltypmod;
else
rescoltypmod = -1;
/* Select common collation. A common collation is
* required for all set operators except UNION ALL; see
* SQL:2008-2 7.13 SR 15c. */
rescolcoll = select_common_collation(pstate,
list_make2(lcolnode, rcolnode),
(op->op == SETOP_UNION && op->all));
/*
* Verify the coercions are actually possible. If not, we'd fail
@@ -1643,11 +1654,13 @@ transformSetOperationTree(ParseState *pstate, SelectStmt *stmt,
rescolnode = makeNode(SetToDefault);
rescolnode->typeId = rescoltype;
rescolnode->typeMod = rescoltypmod;
rescolnode->collid = rescolcoll;
rescolnode->location = exprLocation(bestexpr);
*colInfo = lappend(*colInfo, rescolnode);
op->colTypes = lappend_oid(op->colTypes, rescoltype);
op->colTypmods = lappend_int(op->colTypmods, rescoltypmod);
op->colCollations = lappend_oid(op->colCollations, rescolcoll);
/*
* For all cases except UNION ALL, identify the grouping operators

View File

@@ -261,6 +261,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_
%type <list> func_name handler_name qual_Op qual_all_Op subquery_Op
opt_class opt_inline_handler opt_validator validator_clause
opt_collate
%type <range> qualified_name OptConstrFromTable
@@ -394,7 +395,8 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_
%type <list> copy_generic_opt_list copy_generic_opt_arg_list
%type <list> copy_options
%type <typnam> Typename SimpleTypename ConstTypename
%type <typnam> Typename SimpleTypename SimpleTypenameWithoutCollation
ConstTypename
GenericType Numeric opt_float
Character ConstCharacter
CharacterWithLength CharacterWithoutLength
@@ -5323,38 +5325,45 @@ index_params: index_elem { $$ = list_make1($1); }
* expressions in parens. For backwards-compatibility reasons, we allow
* an expression that's just a function call to be written without parens.
*/
index_elem: ColId opt_class opt_asc_desc opt_nulls_order
index_elem: ColId opt_collate opt_class opt_asc_desc opt_nulls_order
{
$$ = makeNode(IndexElem);
$$->name = $1;
$$->expr = NULL;
$$->indexcolname = NULL;
$$->opclass = $2;
$$->ordering = $3;
$$->nulls_ordering = $4;
$$->collation = $2;
$$->opclass = $3;
$$->ordering = $4;
$$->nulls_ordering = $5;
}
| func_expr opt_class opt_asc_desc opt_nulls_order
| func_expr opt_collate opt_class opt_asc_desc opt_nulls_order
{
$$ = makeNode(IndexElem);
$$->name = NULL;
$$->expr = $1;
$$->indexcolname = NULL;
$$->opclass = $2;
$$->ordering = $3;
$$->nulls_ordering = $4;
$$->collation = $2;
$$->opclass = $3;
$$->ordering = $4;
$$->nulls_ordering = $5;
}
| '(' a_expr ')' opt_class opt_asc_desc opt_nulls_order
| '(' a_expr ')' opt_collate opt_class opt_asc_desc opt_nulls_order
{
$$ = makeNode(IndexElem);
$$->name = NULL;
$$->expr = $2;
$$->indexcolname = NULL;
$$->opclass = $4;
$$->ordering = $5;
$$->nulls_ordering = $6;
$$->collation = $4;
$$->opclass = $5;
$$->ordering = $6;
$$->nulls_ordering = $7;
}
;
opt_collate: COLLATE any_name { $$ = $2; }
| /*EMPTY*/ { $$ = NIL; }
;
opt_class: any_name { $$ = $1; }
| USING any_name { $$ = $2; }
| /*EMPTY*/ { $$ = NIL; }
@@ -8776,6 +8785,13 @@ opt_array_bounds:
;
SimpleTypename:
SimpleTypenameWithoutCollation opt_collate
{
$$ = $1;
$$->collnames = $2;
}
SimpleTypenameWithoutCollation:
GenericType { $$ = $1; }
| Numeric { $$ = $1; }
| Bit { $$ = $1; }
@@ -9811,6 +9827,14 @@ c_expr: columnref { $$ = $1; }
r->location = @1;
$$ = (Node *)r;
}
| c_expr COLLATE any_name
{
CollateClause *n = makeNode(CollateClause);
n->arg = (Expr *) $1;
n->collnames = $3;
n->location = @2;
$$ = (Node *)n;
}
;
/*

View File

@@ -723,6 +723,7 @@ build_aggregate_fnexprs(Oid *agg_input_types,
Oid agg_result_type,
Oid transfn_oid,
Oid finalfn_oid,
Oid collation,
Expr **transfnexpr,
Expr **finalfnexpr)
{
@@ -741,6 +742,7 @@ build_aggregate_fnexprs(Oid *agg_input_types,
argp->paramid = -1;
argp->paramtype = agg_state_type;
argp->paramtypmod = -1;
argp->paramcollation = collation;
argp->location = -1;
args = list_make1(argp);
@@ -752,6 +754,7 @@ build_aggregate_fnexprs(Oid *agg_input_types,
argp->paramid = -1;
argp->paramtype = agg_input_types[i];
argp->paramtypmod = -1;
argp->paramcollation = collation;
argp->location = -1;
args = lappend(args, argp);
}
@@ -759,6 +762,7 @@ build_aggregate_fnexprs(Oid *agg_input_types,
*transfnexpr = (Expr *) makeFuncExpr(transfn_oid,
agg_state_type,
args,
collation,
COERCE_DONTCARE);
/* see if we have a final function */
@@ -776,11 +780,13 @@ build_aggregate_fnexprs(Oid *agg_input_types,
argp->paramid = -1;
argp->paramtype = agg_state_type;
argp->paramtypmod = -1;
argp->paramcollation = collation;
argp->location = -1;
args = list_make1(argp);
*finalfnexpr = (Expr *) makeFuncExpr(finalfn_oid,
agg_result_type,
args,
collation,
COERCE_DONTCARE);
}

View File

@@ -613,7 +613,8 @@ transformRangeFunction(ParseState *pstate, RangeFunction *r)
tupdesc = BuildDescFromLists(rte->eref->colnames,
rte->funccoltypes,
rte->funccoltypmods);
rte->funccoltypmods,
rte->funccolcollations);
CheckAttributeNamesTypes(tupdesc, RELKIND_COMPOSITE_TYPE, false);
}
@@ -1935,6 +1936,7 @@ addTargetToSortList(ParseState *pstate, TargetEntry *tle,
bool resolveUnknown)
{
Oid restype = exprType((Node *) tle->expr);
Oid rescollation = exprCollation((Node *) tle->expr);
Oid sortop;
Oid eqop;
bool hashable;
@@ -2018,6 +2020,12 @@ addTargetToSortList(ParseState *pstate, TargetEntry *tle,
break;
}
if (type_is_collatable(restype) && !OidIsValid(rescollation))
ereport(ERROR,
(errcode(ERRCODE_INDETERMINATE_COLLATION),
errmsg("no collation was derived for the sort expression"),
errhint("Use the COLLATE clause to set the collation explicitly.")));
cancel_parser_errposition_callback(&pcbstate);
/* avoid making duplicate sortlist entries */

View File

@@ -16,6 +16,7 @@
#include "catalog/pg_cast.h"
#include "catalog/pg_class.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
@@ -216,6 +217,7 @@ coerce_type(ParseState *pstate, Node *node,
newcon->consttype = baseTypeId;
newcon->consttypmod = inputTypeMod;
newcon->constcollid = get_typcollation(newcon->consttype);
newcon->constlen = typeLen(targetType);
newcon->constbyval = typeByVal(targetType);
newcon->constisnull = con->constisnull;
@@ -277,6 +279,14 @@ coerce_type(ParseState *pstate, Node *node,
if (result)
return result;
}
if (IsA(node, CollateClause))
{
CollateClause *cc = (CollateClause *) node;
cc->arg = (Expr *) coerce_type(pstate, (Node *) cc->arg, inputTypeId, targetTypeId, targetTypeMod,
ccontext, cformat, location);
return (Node *) cc;
}
pathtype = find_coercion_pathway(targetTypeId, inputTypeId, ccontext,
&funcId);
if (pathtype != COERCION_PATH_NONE)
@@ -718,6 +728,7 @@ build_coercion_expression(Node *node,
FuncExpr *fexpr;
List *args;
Const *cons;
Oid collation;
Assert(OidIsValid(funcId));
@@ -749,7 +760,9 @@ build_coercion_expression(Node *node,
args = lappend(args, cons);
}
fexpr = makeFuncExpr(funcId, targetTypeId, args, cformat);
collation = coercion_expression_result_collation(targetTypeId, node);
fexpr = makeFuncExpr(funcId, targetTypeId, args, collation, cformat);
fexpr->location = location;
return (Node *) fexpr;
}
@@ -2081,3 +2094,120 @@ typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
return result;
}
/*
* select_common_collation() -- determine one collation to apply for
* an expression node, for evaluating the expression itself or to
* label the result of the expression node.
*
* none_ok means that it is permitted to return "no" collation. It is
* then not possible to sort the result value of whatever expression
* is applying this. none_ok = true reflects the rules of SQL
* standard clause "Result of data type combinations", none_ok = false
* reflects the rules of clause "Collation determination" (in some
* cases invoked via "Grouping operations").
*/
Oid
select_common_collation(ParseState *pstate, List *exprs, bool none_ok)
{
ListCell *lc;
/*
* Check if there are any explicit collation derivations. If so,
* they must all be the same.
*/
foreach(lc, exprs)
{
Node *pexpr = (Node *) lfirst(lc);
Oid pcoll = exprCollation(pexpr);
bool pexplicit = IsA(pexpr, CollateClause);
if (pcoll && pexplicit)
{
ListCell *lc2;
for_each_cell(lc2, lnext(lc))
{
Node *nexpr = (Node *) lfirst(lc2);
Oid ncoll = exprCollation(nexpr);
bool nexplicit = IsA(nexpr, CollateClause);
if (!ncoll || !nexplicit)
continue;
if (ncoll != pcoll)
ereport(ERROR,
(errcode(ERRCODE_COLLATION_MISMATCH),
errmsg("collation mismatch between explicit collations \"%s\" and \"%s\"",
get_collation_name(pcoll),
get_collation_name(ncoll)),
parser_errposition(pstate, exprLocation(nexpr))));
}
return pcoll;
}
}
/*
* Check if there are any implicit collation derivations.
*/
foreach(lc, exprs)
{
Node *pexpr = (Node *) lfirst(lc);
Oid pcoll = exprCollation(pexpr);
if (pcoll && pcoll != DEFAULT_COLLATION_OID)
{
ListCell *lc2;
for_each_cell(lc2, lnext(lc))
{
Node *nexpr = (Node *) lfirst(lc2);
Oid ncoll = exprCollation(nexpr);
if (!ncoll || ncoll == DEFAULT_COLLATION_OID)
continue;
if (ncoll != pcoll)
{
if (none_ok)
return InvalidOid;
ereport(ERROR,
(errcode(ERRCODE_COLLATION_MISMATCH),
errmsg("collation mismatch between implicit collations \"%s\" and \"%s\"",
get_collation_name(pcoll),
get_collation_name(ncoll)),
errhint("You can override the collation by applying the COLLATE clause to one or both expressions."),
parser_errposition(pstate, exprLocation(nexpr))));
}
}
return pcoll;
}
}
foreach(lc, exprs)
{
Node *pexpr = (Node *) lfirst(lc);
Oid pcoll = exprCollation(pexpr);
if (pcoll == DEFAULT_COLLATION_OID)
{
ListCell *lc2;
for_each_cell(lc2, lnext(lc))
{
Node *nexpr = (Node *) lfirst(lc2);
Oid ncoll = exprCollation(nexpr);
if (ncoll != pcoll)
break;
}
return pcoll;
}
}
/*
* Else use default
*/
return InvalidOid;
}

View File

@@ -14,11 +14,13 @@
*/
#include "postgres.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "nodes/nodeFuncs.h"
#include "parser/analyze.h"
#include "parser/parse_cte.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/* Enumeration of contexts in which a self-reference is disallowed */
@@ -263,11 +265,13 @@ analyzeCTE(ParseState *pstate, CommonTableExpr *cte)
*/
ListCell *lctlist,
*lctyp,
*lctypmod;
*lctypmod,
*lccoll;
int varattno;
lctyp = list_head(cte->ctecoltypes);
lctypmod = list_head(cte->ctecoltypmods);
lccoll = list_head(cte->ctecolcollations);
varattno = 0;
foreach(lctlist, query->targetList)
{
@@ -278,7 +282,7 @@ analyzeCTE(ParseState *pstate, CommonTableExpr *cte)
continue;
varattno++;
Assert(varattno == te->resno);
if (lctyp == NULL || lctypmod == NULL) /* shouldn't happen */
if (lctyp == NULL || lctypmod == NULL || lccoll == NULL) /* shouldn't happen */
elog(ERROR, "wrong number of output columns in WITH");
texpr = (Node *) te->expr;
if (exprType(texpr) != lfirst_oid(lctyp) ||
@@ -293,10 +297,20 @@ analyzeCTE(ParseState *pstate, CommonTableExpr *cte)
exprTypmod(texpr))),
errhint("Cast the output of the non-recursive term to the correct type."),
parser_errposition(pstate, exprLocation(texpr))));
if (exprCollation(texpr) != lfirst_oid(lccoll))
ereport(ERROR,
(errcode(ERRCODE_COLLATION_MISMATCH),
errmsg("recursive query \"%s\" column %d has collation \"%s\" in non-recursive term but collation \"%s\" overall",
cte->ctename, varattno,
get_collation_name(lfirst_oid(lccoll)),
get_collation_name(exprCollation(texpr))),
errhint("Use the COLLATE clause to set the collation of the non-recursive term."),
parser_errposition(pstate, exprLocation(texpr))));
lctyp = lnext(lctyp);
lctypmod = lnext(lctypmod);
lccoll = lnext(lccoll);
}
if (lctyp != NULL || lctypmod != NULL) /* shouldn't happen */
if (lctyp != NULL || lctypmod != NULL || lccoll != NULL) /* shouldn't happen */
elog(ERROR, "wrong number of output columns in WITH");
}
}
@@ -331,7 +345,7 @@ analyzeCTETargetList(ParseState *pstate, CommonTableExpr *cte, List *tlist)
* handling.)
*/
cte->ctecolnames = copyObject(cte->aliascolnames);
cte->ctecoltypes = cte->ctecoltypmods = NIL;
cte->ctecoltypes = cte->ctecoltypmods = cte->ctecolcollations = NIL;
numaliases = list_length(cte->aliascolnames);
varattno = 0;
foreach(tlistitem, tlist)
@@ -339,6 +353,7 @@ analyzeCTETargetList(ParseState *pstate, CommonTableExpr *cte, List *tlist)
TargetEntry *te = (TargetEntry *) lfirst(tlistitem);
Oid coltype;
int32 coltypmod;
Oid colcoll;
if (te->resjunk)
continue;
@@ -353,6 +368,7 @@ analyzeCTETargetList(ParseState *pstate, CommonTableExpr *cte, List *tlist)
}
coltype = exprType((Node *) te->expr);
coltypmod = exprTypmod((Node *) te->expr);
colcoll = exprCollation((Node *) te->expr);
/*
* If the CTE is recursive, force the exposed column type of any
@@ -366,9 +382,11 @@ analyzeCTETargetList(ParseState *pstate, CommonTableExpr *cte, List *tlist)
{
coltype = TEXTOID;
coltypmod = -1; /* should be -1 already, but be sure */
colcoll = DEFAULT_COLLATION_OID;
}
cte->ctecoltypes = lappend_oid(cte->ctecoltypes, coltype);
cte->ctecoltypmods = lappend_int(cte->ctecoltypmods, coltypmod);
cte->ctecolcollations = lappend_oid(cte->ctecolcollations, colcoll);
}
if (varattno < numaliases)
ereport(ERROR,

View File

@@ -65,6 +65,7 @@ static Node *transformWholeRowRef(ParseState *pstate, RangeTblEntry *rte,
static Node *transformIndirection(ParseState *pstate, Node *basenode,
List *indirection);
static Node *transformTypeCast(ParseState *pstate, TypeCast *tc);
static Node *transformCollateClause(ParseState *pstate, CollateClause *c);
static Node *make_row_comparison_op(ParseState *pstate, List *opname,
List *largs, List *rargs, int location);
static Node *make_row_distinct_op(ParseState *pstate, List *opname,
@@ -146,6 +147,12 @@ transformExpr(ParseState *pstate, Node *expr)
{
TypeCast *tc = (TypeCast *) expr;
if (tc->typeName->collnames)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("COLLATE clause not allowed in cast target"),
parser_errposition(pstate, tc->typeName->location)));
/*
* If the subject of the typecast is an ARRAY[] construct and
* the target type is an array type, we invoke
@@ -185,6 +192,10 @@ transformExpr(ParseState *pstate, Node *expr)
break;
}
case T_CollateClause:
result = transformCollateClause(pstate, (CollateClause *) expr);
break;
case T_A_Expr:
{
A_Expr *a = (A_Expr *) expr;
@@ -423,6 +434,7 @@ transformIndirection(ParseState *pstate, Node *basenode, List *indirection)
exprType(result),
InvalidOid,
exprTypmod(result),
exprCollation(result),
subscripts,
NULL);
subscripts = NIL;
@@ -444,6 +456,7 @@ transformIndirection(ParseState *pstate, Node *basenode, List *indirection)
exprType(result),
InvalidOid,
exprTypmod(result),
exprCollation(result),
subscripts,
NULL);
@@ -1267,6 +1280,7 @@ transformCaseExpr(ParseState *pstate, CaseExpr *c)
placeholder = makeNode(CaseTestExpr);
placeholder->typeId = exprType(arg);
placeholder->typeMod = exprTypmod(arg);
placeholder->collation = exprCollation(arg);
}
else
placeholder = NULL;
@@ -1351,6 +1365,8 @@ transformCaseExpr(ParseState *pstate, CaseExpr *c)
"CASE/WHEN");
}
newc->casecollation = select_common_collation(pstate, resultexprs, true);
newc->location = c->location;
return (Node *) newc;
@@ -1461,6 +1477,7 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
param->paramid = tent->resno;
param->paramtype = exprType((Node *) tent->expr);
param->paramtypmod = exprTypmod((Node *) tent->expr);
param->paramcollation = exprCollation((Node *) tent->expr);
param->location = -1;
right_list = lappend(right_list, param);
@@ -1704,6 +1721,7 @@ transformCoalesceExpr(ParseState *pstate, CoalesceExpr *c)
}
newc->args = newcoercedargs;
newc->coalescecollation = select_common_collation(pstate, newcoercedargs, true);
newc->location = c->location;
return (Node *) newc;
}
@@ -1728,6 +1746,7 @@ transformMinMaxExpr(ParseState *pstate, MinMaxExpr *m)
}
newm->minmaxtype = select_common_type(pstate, newargs, funcname, NULL);
newm->collid = select_common_collation(pstate, newargs, false);
/* Convert arguments if necessary */
foreach(args, newargs)
@@ -2082,6 +2101,36 @@ transformTypeCast(ParseState *pstate, TypeCast *tc)
return result;
}
/*
* Handle an explicit COLLATE clause.
*
* Transform the argument, and look up the collation name.
*/
static Node *
transformCollateClause(ParseState *pstate, CollateClause *c)
{
CollateClause *newc;
Oid argtype;
newc = makeNode(CollateClause);
newc->arg = (Expr *) transformExpr(pstate, (Node *) c->arg);
argtype = exprType((Node *) newc->arg);
/* The unknown type is not collatable, but coerce_type() takes
* care of it separately, so we'll let it go here. */
if (!type_is_collatable(argtype) && argtype != UNKNOWNOID)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("collations are not supported by type %s",
format_type_be(argtype))));
newc->collOid = LookupCollation(pstate, c->collnames, c->location);
newc->collnames = c->collnames;
newc->location = c->location;
return (Node *) newc;
}
/*
* Transform a "row compare-op row" construct
*
@@ -2103,6 +2152,7 @@ make_row_comparison_op(ParseState *pstate, List *opname,
List *opexprs;
List *opnos;
List *opfamilies;
List *collids;
ListCell *l,
*r;
List **opfamily_lists;
@@ -2273,6 +2323,7 @@ make_row_comparison_op(ParseState *pstate, List *opname,
* possibility that make_op inserted coercion operations.
*/
opnos = NIL;
collids = NIL;
largs = NIL;
rargs = NIL;
foreach(l, opexprs)
@@ -2280,6 +2331,7 @@ make_row_comparison_op(ParseState *pstate, List *opname,
OpExpr *cmp = (OpExpr *) lfirst(l);
opnos = lappend_oid(opnos, cmp->opno);
collids = lappend_oid(collids, cmp->collid);
largs = lappend(largs, linitial(cmp->args));
rargs = lappend(rargs, lsecond(cmp->args));
}
@@ -2288,6 +2340,7 @@ make_row_comparison_op(ParseState *pstate, List *opname,
rcexpr->rctype = rctype;
rcexpr->opnos = opnos;
rcexpr->opfamilies = opfamilies;
rcexpr->collids = collids;
rcexpr->largs = largs;
rcexpr->rargs = rargs;

View File

@@ -78,6 +78,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
bool retset;
int nvargs;
FuncDetailCode fdresult;
Oid funccollid;
/*
* Most of the rest of the parser just assumes that functions do not have
@@ -343,6 +344,12 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
/* perform the necessary typecasting of arguments */
make_fn_arguments(pstate, fargs, actual_arg_types, declared_arg_types);
/* XXX: If we knew which functions required collation information,
* we could selectively set the last argument to true here. */
funccollid = select_common_collation(pstate, fargs, false);
if (!OidIsValid(funccollid))
funccollid = get_typcollation(rettype);
/*
* If it's a variadic function call, transform the last nvargs arguments
* into an array --- unless it's an "any" variadic.
@@ -383,6 +390,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
funcexpr->funcretset = retset;
funcexpr->funcformat = COERCE_EXPLICIT_CALL;
funcexpr->args = fargs;
funcexpr->collid = funccollid;
funcexpr->location = location;
retval = (Node *) funcexpr;
@@ -396,6 +404,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
aggref->aggtype = rettype;
/* args, aggorder, aggdistinct will be set by transformAggregateCall */
aggref->aggstar = agg_star;
aggref->collid = funccollid;
/* agglevelsup will be set by transformAggregateCall */
aggref->location = location;
@@ -453,6 +462,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
/* winref will be set by transformWindowFuncCall */
wfunc->winstar = agg_star;
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->collid = funccollid;
wfunc->location = location;
/*
@@ -1303,7 +1313,7 @@ FuncNameAsType(List *funcname)
Oid result;
Type typtup;
typtup = LookupTypeName(NULL, makeTypeNameFromNameList(funcname), NULL);
typtup = LookupTypeName(NULL, makeTypeNameFromNameList(funcname), NULL, NULL);
if (typtup == NULL)
return InvalidOid;
@@ -1380,6 +1390,7 @@ ParseComplexProjection(ParseState *pstate, char *funcname, Node *first_arg,
fselect->fieldnum = i + 1;
fselect->resulttype = att->atttypid;
fselect->resulttypmod = att->atttypmod;
fselect->resultcollation = att->attcollation;
return (Node *) fselect;
}
}
@@ -1489,7 +1500,7 @@ LookupTypeNameOid(const TypeName *typename)
Oid result;
Type typtup;
typtup = LookupTypeName(NULL, typename, NULL);
typtup = LookupTypeName(NULL, typename, NULL, NULL);
if (typtup == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),

View File

@@ -189,10 +189,11 @@ make_var(ParseState *pstate, RangeTblEntry *rte, int attrno, int location)
sublevels_up;
Oid vartypeid;
int32 type_mod;
Oid varcollid;
vnum = RTERangeTablePosn(pstate, rte, &sublevels_up);
get_rte_attribute_type(rte, attrno, &vartypeid, &type_mod);
result = makeVar(vnum, attrno, vartypeid, type_mod, sublevels_up);
get_rte_attribute_type(rte, attrno, &vartypeid, &type_mod, &varcollid);
result = makeVar(vnum, attrno, vartypeid, type_mod, varcollid, sublevels_up);
result->location = location;
return result;
}
@@ -269,6 +270,7 @@ transformArrayType(Oid *arrayType, int32 *arrayTypmod)
* elementType OID of array's element type (fetch with transformArrayType,
* or pass InvalidOid to do it here)
* arrayTypMod typmod for the array (which is also typmod for the elements)
* arrayColl OID of collation of array and array's elements
* indirection Untransformed list of subscripts (must not be NIL)
* assignFrom NULL for array fetch, else transformed expression for source.
*/
@@ -278,6 +280,7 @@ transformArraySubscripts(ParseState *pstate,
Oid arrayType,
Oid elementType,
int32 arrayTypMod,
Oid arrayColl,
List *indirection,
Node *assignFrom)
{
@@ -404,6 +407,7 @@ transformArraySubscripts(ParseState *pstate,
aref->refarraytype = arrayType;
aref->refelemtype = elementType;
aref->reftypmod = arrayTypMod;
aref->refcollid = arrayColl;
aref->refupperindexpr = upperIndexpr;
aref->reflowerindexpr = lowerIndexpr;
aref->refexpr = (Expr *) arrayBase;

View File

@@ -782,6 +782,7 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree,
List *args;
Oid rettype;
OpExpr *result;
Oid opcollid;
/* Select the operator */
if (rtree == NULL)
@@ -861,6 +862,12 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree,
/* perform the necessary typecasting of arguments */
make_fn_arguments(pstate, args, actual_arg_types, declared_arg_types);
/* XXX: If we knew which functions required collation information,
* we could selectively set the last argument to true here. */
opcollid = select_common_collation(pstate, args, false);
if (!OidIsValid(opcollid))
opcollid = get_typcollation(rettype);
/* and build the expression node */
result = makeNode(OpExpr);
result->opno = oprid(tup);
@@ -868,6 +875,7 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree,
result->opresulttype = rettype;
result->opretset = get_func_retset(opform->oprcode);
result->args = args;
result->collid = opcollid;
result->location = location;
ReleaseSysCache(tup);
@@ -896,6 +904,7 @@ make_scalar_array_op(ParseState *pstate, List *opname,
List *args;
Oid rettype;
ScalarArrayOpExpr *result;
Oid opcollid;
ltypeId = exprType(ltree);
atypeId = exprType(rtree);
@@ -990,12 +999,19 @@ make_scalar_array_op(ParseState *pstate, List *opname,
/* perform the necessary typecasting of arguments */
make_fn_arguments(pstate, args, actual_arg_types, declared_arg_types);
/* XXX: If we knew which functions required collation information,
* we could selectively set the last argument to true here. */
opcollid = select_common_collation(pstate, args, false);
if (!OidIsValid(opcollid))
opcollid = get_typcollation(rettype);
/* and build the expression node */
result = makeNode(ScalarArrayOpExpr);
result->opno = oprid(tup);
result->opfuncid = opform->oprcode;
result->useOr = useOr;
result->args = args;
result->collid = opcollid;
result->location = location;
ReleaseSysCache(tup);

View File

@@ -30,6 +30,7 @@
#include "nodes/nodeFuncs.h"
#include "parser/parse_param.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
typedef struct FixedParamState
@@ -113,6 +114,7 @@ fixed_paramref_hook(ParseState *pstate, ParamRef *pref)
param->paramid = paramno;
param->paramtype = parstate->paramTypes[paramno - 1];
param->paramtypmod = -1;
param->paramcollation = get_typcollation(param->paramtype);
param->location = pref->location;
return (Node *) param;
@@ -165,6 +167,7 @@ variable_paramref_hook(ParseState *pstate, ParamRef *pref)
param->paramid = paramno;
param->paramtype = *pptype;
param->paramtypmod = -1;
param->paramcollation = get_typcollation(param->paramtype);
param->location = pref->location;
return (Node *) param;

View File

@@ -1098,6 +1098,7 @@ addRangeTableEntryForFunction(ParseState *pstate,
rte->funcexpr = funcexpr;
rte->funccoltypes = NIL;
rte->funccoltypmods = NIL;
rte->funccolcollations = NIL;
rte->alias = alias;
eref = makeAlias(alias ? alias->aliasname : funcname, NIL);
@@ -1157,6 +1158,7 @@ addRangeTableEntryForFunction(ParseState *pstate,
char *attrname;
Oid attrtype;
int32 attrtypmod;
Oid attrcollation;
attrname = pstrdup(n->colname);
if (n->typeName->setof)
@@ -1165,10 +1167,11 @@ addRangeTableEntryForFunction(ParseState *pstate,
errmsg("column \"%s\" cannot be declared SETOF",
attrname),
parser_errposition(pstate, n->typeName->location)));
typenameTypeIdAndMod(pstate, n->typeName, &attrtype, &attrtypmod);
typenameTypeIdModColl(pstate, n->typeName, &attrtype, &attrtypmod, &attrcollation);
eref->colnames = lappend(eref->colnames, makeString(attrname));
rte->funccoltypes = lappend_oid(rte->funccoltypes, attrtype);
rte->funccoltypmods = lappend_int(rte->funccoltypmods, attrtypmod);
rte->funccolcollations = lappend_oid(rte->funccolcollations, attrcollation);
}
}
else
@@ -1381,6 +1384,7 @@ addRangeTableEntryForCTE(ParseState *pstate,
rte->ctecoltypes = cte->ctecoltypes;
rte->ctecoltypmods = cte->ctecoltypmods;
rte->ctecolcollations = cte->ctecolcollations;
rte->alias = alias;
if (alias)
@@ -1573,6 +1577,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
varnode = makeVar(rtindex, varattno,
exprType((Node *) te->expr),
exprTypmod((Node *) te->expr),
exprCollation((Node *) te->expr),
sublevels_up);
varnode->location = location;
@@ -1612,6 +1617,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
varnode = makeVar(rtindex, 1,
funcrettype, -1,
exprCollation(rte->funcexpr),
sublevels_up);
varnode->location = location;
@@ -1626,12 +1632,14 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
{
ListCell *l1;
ListCell *l2;
ListCell *l3;
int attnum = 0;
forboth(l1, rte->funccoltypes, l2, rte->funccoltypmods)
forthree(l1, rte->funccoltypes, l2, rte->funccoltypmods, l3, rte->funccolcollations)
{
Oid attrtype = lfirst_oid(l1);
int32 attrtypmod = lfirst_int(l2);
Oid attrcollation = lfirst_oid(l3);
Var *varnode;
attnum++;
@@ -1639,6 +1647,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
attnum,
attrtype,
attrtypmod,
attrcollation,
sublevels_up);
varnode->location = location;
*colvars = lappend(*colvars, varnode);
@@ -1681,6 +1690,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
varnode = makeVar(rtindex, varattno,
exprType(col),
exprTypmod(col),
exprCollation(col),
sublevels_up);
varnode->location = location;
*colvars = lappend(*colvars, varnode);
@@ -1740,6 +1750,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
varnode = makeVar(rtindex, varattno,
exprType(avar),
exprTypmod(avar),
exprCollation(avar),
sublevels_up);
varnode->location = location;
@@ -1753,12 +1764,14 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
ListCell *aliasp_item = list_head(rte->eref->colnames);
ListCell *lct;
ListCell *lcm;
ListCell *lcc;
varattno = 0;
forboth(lct, rte->ctecoltypes, lcm, rte->ctecoltypmods)
forthree(lct, rte->ctecoltypes, lcm, rte->ctecoltypmods, lcc, rte->ctecolcollations)
{
Oid coltype = lfirst_oid(lct);
int32 coltypmod = lfirst_int(lcm);
Oid colcoll = lfirst_oid(lcc);
varattno++;
@@ -1776,7 +1789,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
Var *varnode;
varnode = makeVar(rtindex, varattno,
coltype, coltypmod,
coltype, coltypmod, colcoll,
sublevels_up);
*colvars = lappend(*colvars, varnode);
}
@@ -1857,7 +1870,7 @@ expandTupleDesc(TupleDesc tupdesc, Alias *eref,
Var *varnode;
varnode = makeVar(rtindex, attr->attnum,
attr->atttypid, attr->atttypmod,
attr->atttypid, attr->atttypmod, attr->attcollation,
sublevels_up);
varnode->location = location;
@@ -1968,7 +1981,7 @@ get_rte_attribute_name(RangeTblEntry *rte, AttrNumber attnum)
*/
void
get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
Oid *vartype, int32 *vartypmod)
Oid *vartype, int32 *vartypmod, Oid *varcollid)
{
switch (rte->rtekind)
{
@@ -1998,6 +2011,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
get_rel_name(rte->relid))));
*vartype = att_tup->atttypid;
*vartypmod = att_tup->atttypmod;
*varcollid = att_tup->attcollation;
ReleaseSysCache(tp);
}
break;
@@ -2012,6 +2026,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
rte->eref->aliasname, attnum);
*vartype = exprType((Node *) te->expr);
*vartypmod = exprTypmod((Node *) te->expr);
*varcollid = exprCollation((Node *) te->expr);
}
break;
case RTE_FUNCTION:
@@ -2053,17 +2068,20 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
rte->eref->aliasname)));
*vartype = att_tup->atttypid;
*vartypmod = att_tup->atttypmod;
*varcollid = att_tup->attcollation;
}
else if (functypclass == TYPEFUNC_SCALAR)
{
/* Base data type, i.e. scalar */
*vartype = funcrettype;
*vartypmod = -1;
*varcollid = exprCollation(rte->funcexpr);
}
else if (functypclass == TYPEFUNC_RECORD)
{
*vartype = list_nth_oid(rte->funccoltypes, attnum - 1);
*vartypmod = list_nth_int(rte->funccoltypmods, attnum - 1);
*varcollid = list_nth_oid(rte->funccolcollations, attnum - 1);
}
else
{
@@ -2084,6 +2102,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
col = (Node *) list_nth(collist, attnum - 1);
*vartype = exprType(col);
*vartypmod = exprTypmod(col);
*varcollid = exprCollation(col);
}
break;
case RTE_JOIN:
@@ -2097,6 +2116,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
aliasvar = (Node *) list_nth(rte->joinaliasvars, attnum - 1);
*vartype = exprType(aliasvar);
*vartypmod = exprTypmod(aliasvar);
*varcollid = exprCollation(aliasvar);
}
break;
case RTE_CTE:
@@ -2105,6 +2125,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
Assert(attnum > 0 && attnum <= list_length(rte->ctecoltypes));
*vartype = list_nth_oid(rte->ctecoltypes, attnum - 1);
*vartypmod = list_nth_int(rte->ctecoltypmods, attnum - 1);
*varcollid = list_nth_oid(rte->ctecolcollations, attnum - 1);
}
break;
default:

View File

@@ -374,6 +374,7 @@ transformAssignedExpr(ParseState *pstate,
Oid type_id; /* type of value provided */
Oid attrtype; /* type of target column */
int32 attrtypmod;
Oid attrcollation;
Relation rd = pstate->p_target_relation;
Assert(rd != NULL);
@@ -385,6 +386,7 @@ transformAssignedExpr(ParseState *pstate,
parser_errposition(pstate, location)));
attrtype = attnumTypeId(rd, attrno);
attrtypmod = rd->rd_att->attrs[attrno - 1]->atttypmod;
attrcollation = rd->rd_att->attrs[attrno - 1]->attcollation;
/*
* If the expression is a DEFAULT placeholder, insert the attribute's
@@ -400,6 +402,7 @@ transformAssignedExpr(ParseState *pstate,
def->typeId = attrtype;
def->typeMod = attrtypmod;
def->collid = attrcollation;
if (indirection)
{
if (IsA(linitial(indirection), A_Indices))
@@ -786,6 +789,7 @@ transformAssignmentSubscripts(ParseState *pstate,
arrayType,
elementTypeId,
arrayTypMod,
InvalidOid,
subscripts,
rhs);
@@ -1267,6 +1271,7 @@ ExpandRowReference(ParseState *pstate, Node *expr,
fselect->fieldnum = i + 1;
fselect->resulttype = att->atttypid;
fselect->resulttypmod = att->atttypmod;
fselect->resultcollation = att->attcollation;
if (targetlist)
{
@@ -1338,6 +1343,8 @@ expandRecordVariable(ParseState *pstate, Var *var, int levelsup)
exprType(varnode),
exprTypmod(varnode),
0);
TupleDescInitEntryCollation(tupleDesc, i,
exprCollation(varnode));
i++;
}
Assert(lname == NULL && lvar == NULL); /* lists same length? */
@@ -1583,6 +1590,8 @@ FigureColnameInternal(Node *node, char **name)
}
}
break;
case T_CollateClause:
return FigureColnameInternal((Node *) ((CollateClause *) node)->arg, name);
case T_CaseExpr:
strength = FigureColnameInternal((Node *) ((CaseExpr *) node)->defresult,
name);

View File

@@ -29,6 +29,8 @@
static int32 typenameTypeMod(ParseState *pstate, const TypeName *typeName,
Type typ);
static Oid typenameCollation(ParseState *pstate, const TypeName *typeName,
Type typ);
/*
@@ -36,7 +38,8 @@ static int32 typenameTypeMod(ParseState *pstate, const TypeName *typeName,
* Given a TypeName object, lookup the pg_type syscache entry of the type.
* Returns NULL if no such type can be found. If the type is found,
* the typmod value represented in the TypeName struct is computed and
* stored into *typmod_p.
* stored into *typmod_p, and the collation is looked up and stored into
* *colloid_p.
*
* NB: on success, the caller must ReleaseSysCache the type tuple when done
* with it.
@@ -51,15 +54,18 @@ static int32 typenameTypeMod(ParseState *pstate, const TypeName *typeName,
* found but is a shell, and there is typmod decoration, an error will be
* thrown --- this is intentional.
*
* colloid_p can also be null.
*
* pstate is only used for error location info, and may be NULL.
*/
Type
LookupTypeName(ParseState *pstate, const TypeName *typeName,
int32 *typmod_p)
int32 *typmod_p, Oid *collid_p)
{
Oid typoid;
HeapTuple tup;
int32 typmod;
Oid collid;
if (typeName->names == NIL)
{
@@ -174,6 +180,11 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
if (typmod_p)
*typmod_p = typmod;
collid = typenameCollation(pstate, typeName, (Type) tup);
if (collid_p)
*collid_p = collid;
return (Type) tup;
}
@@ -185,11 +196,11 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName,
* Callers of this can therefore assume the result is a fully valid type.
*/
Type
typenameType(ParseState *pstate, const TypeName *typeName, int32 *typmod_p)
typenameType(ParseState *pstate, const TypeName *typeName, int32 *typmod_p, Oid *collid_p)
{
Type tup;
tup = LookupTypeName(pstate, typeName, typmod_p);
tup = LookupTypeName(pstate, typeName, typmod_p, collid_p);
if (tup == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
@@ -217,7 +228,7 @@ typenameTypeId(ParseState *pstate, const TypeName *typeName)
Oid typoid;
Type tup;
tup = typenameType(pstate, typeName, NULL);
tup = typenameType(pstate, typeName, NULL, NULL);
typoid = HeapTupleGetOid(tup);
ReleaseSysCache(tup);
@@ -236,7 +247,25 @@ typenameTypeIdAndMod(ParseState *pstate, const TypeName *typeName,
{
Type tup;
tup = typenameType(pstate, typeName, typmod_p);
tup = typenameType(pstate, typeName, typmod_p, NULL);
*typeid_p = HeapTupleGetOid(tup);
ReleaseSysCache(tup);
}
/*
* typenameTypeIdModColl - given a TypeName, return the type's OID,
* typmod, and collation
*
* This is equivalent to typenameType, but we only hand back the type OID,
* typmod, and collation, not the syscache entry.
*/
void
typenameTypeIdModColl(ParseState *pstate, const TypeName *typeName,
Oid *typeid_p, int32 *typmod_p, Oid *collid_p)
{
Type tup;
tup = typenameType(pstate, typeName, typmod_p, collid_p);
*typeid_p = HeapTupleGetOid(tup);
ReleaseSysCache(tup);
}
@@ -350,6 +379,62 @@ typenameTypeMod(ParseState *pstate, const TypeName *typeName, Type typ)
return result;
}
/*
* typenameCollation - given a TypeName, return the collation OID
*
* This will throw an error if the TypeName includes a collation but
* the data type does not support collations.
*
* The actual type OID represented by the TypeName must already have been
* looked up, and is passed as "typ".
*
* pstate is only used for error location info, and may be NULL.
*/
static Oid
typenameCollation(ParseState *pstate, const TypeName *typeName, Type typ)
{
Oid typcollation = ((Form_pg_type) GETSTRUCT(typ))->typcollation;
/* return prespecified collation OID if no collation name specified */
if (typeName->collnames == NIL)
{
if (typeName->collOid == InvalidOid)
return typcollation;
else
return typeName->collOid;
}
if (!OidIsValid(typcollation))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("collations are not supported by type %s",
format_type_be(HeapTupleGetOid(typ))),
parser_errposition(pstate, typeName->location)));
return LookupCollation(pstate, typeName->collnames, typeName->location);
}
/*
* LookupCollation
*
* Look up collation by name, return OID, with support for error
* location.
*/
Oid
LookupCollation(ParseState *pstate, List *collnames, int location)
{
Oid colloid;
ParseCallbackState pcbstate;
setup_parser_errposition_callback(&pcbstate, pstate, location);
colloid = get_collation_oid(collnames, false);
cancel_parser_errposition_callback(&pcbstate);
return colloid;
}
/*
* appendTypeNameToBuffer
* Append a string representing the name of a TypeName to a StringInfo.

View File

@@ -627,7 +627,8 @@ transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
def = makeNode(ColumnDef);
def->colname = pstrdup(attributeName);
def->typeName = makeTypeNameFromOid(attribute->atttypid,
attribute->atttypmod);
attribute->atttypmod,
attribute->attcollation);
def->inhcount = 0;
def->is_local = true;
def->is_not_null = attribute->attnotnull;
@@ -821,7 +822,7 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
AssertArg(ofTypename);
tuple = typenameType(NULL, ofTypename, NULL);
tuple = typenameType(NULL, ofTypename, NULL, NULL);
typ = (Form_pg_type) GETSTRUCT(tuple);
ofTypeId = HeapTupleGetOid(tuple);
ofTypename->typeOid = ofTypeId; /* cached for later */
@@ -842,7 +843,7 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
continue;
n->colname = pstrdup(NameStr(attr->attname));
n->typeName = makeTypeNameFromOid(attr->atttypid, attr->atttypmod);
n->typeName = makeTypeNameFromOid(attr->atttypid, attr->atttypmod, attr->attcollation);
n->constraints = NULL;
n->is_local = true;
n->is_from_type = true;
@@ -2446,7 +2447,7 @@ transformColumnType(CreateStmtContext *cxt, ColumnDef *column)
/*
* All we really need to do here is verify that the type is valid.
*/
Type ctype = typenameType(cxt->pstate, column->typeName, NULL);
Type ctype = typenameType(cxt->pstate, column->typeName, NULL, NULL);
ReleaseSysCache(ctype);
}