1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-31 22:04:40 +03:00

Fix handling of collations in multi-row VALUES constructs.

Per spec we ought to apply select_common_collation() across the expressions
in each column of the VALUES table.  The original coding was just taking
the first row and assuming it was representative.

This patch adds a field to struct RangeTblEntry to carry the resolved
collations, so initdb is forced for changes in stored rule representation.
This commit is contained in:
Tom Lane
2011-04-18 15:31:52 -04:00
parent 04db0fdbfa
commit 918854cc08
13 changed files with 131 additions and 53 deletions

View File

@ -536,7 +536,9 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
* RTE.
*/
List *exprsLists = NIL;
List *collations = NIL;
int sublist_length = -1;
int i;
foreach(lc, selectStmt->valuesLists)
{
@ -573,13 +575,26 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
* We must assign collations now because assign_query_collations
* doesn't process rangetable entries. We just assign all the
* collations independently in each row, and don't worry about
* whether they are consistent vertically either.
* whether they are consistent vertically. The outer INSERT query
* isn't going to care about the collations of the VALUES columns,
* so it's not worth the effort to identify a common collation for
* each one here. (But note this does have one user-visible
* consequence: INSERT ... VALUES won't complain about conflicting
* explicit COLLATEs in a column, whereas the same VALUES
* construct in another context would complain.)
*/
assign_list_collations(pstate, sublist);
exprsLists = lappend(exprsLists, sublist);
}
/*
* Although we don't really need collation info, let's just make sure
* we provide a correctly-sized list in the VALUES RTE.
*/
for (i = 0; i < sublist_length; i++)
collations = lappend_oid(collations, InvalidOid);
/*
* There mustn't have been any table references in the expressions,
* else strange things would happen, like Cartesian products of those
@ -610,7 +625,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
/*
* Generate the VALUES RTE
*/
rte = addRangeTableEntryForValues(pstate, exprsLists, NULL, true);
rte = addRangeTableEntryForValues(pstate, exprsLists, collations,
NULL, true);
rtr = makeNode(RangeTblRef);
/* assume new rte is at end */
rtr->rtindex = list_length(pstate->p_rtable);
@ -989,11 +1005,10 @@ static Query *
transformValuesClause(ParseState *pstate, SelectStmt *stmt)
{
Query *qry = makeNode(Query);
List *exprsLists = NIL;
List *exprsLists;
List *collations;
List **colexprs = NULL;
Oid *coltypes = NULL;
int sublist_length = -1;
List *newExprsLists;
RangeTblEntry *rte;
RangeTblRef *rtr;
ListCell *lc;
@ -1021,9 +1036,13 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
}
/*
* For each row of VALUES, transform the raw expressions and gather type
* information. This is also a handy place to reject DEFAULT nodes, which
* the grammar allows for simplicity.
* For each row of VALUES, transform the raw expressions. This is also a
* handy place to reject DEFAULT nodes, which the grammar allows for
* simplicity.
*
* Note that the intermediate representation we build is column-organized
* not row-organized. That simplifies the type and collation processing
* below.
*/
foreach(lc, stmt->valuesLists)
{
@ -1041,9 +1060,8 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
{
/* Remember post-transformation length of first sublist */
sublist_length = list_length(sublist);
/* and allocate arrays for per-column info */
/* and allocate array for per-column lists */
colexprs = (List **) palloc0(sublist_length * sizeof(List *));
coltypes = (Oid *) palloc0(sublist_length * sizeof(Oid));
}
else if (sublist_length != list_length(sublist))
{
@ -1054,8 +1072,6 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
exprLocation((Node *) sublist))));
}
exprsLists = lappend(exprsLists, sublist);
/* Check for DEFAULT and build per-column expression lists */
i = 0;
foreach(lc2, sublist)
@ -1070,48 +1086,77 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
colexprs[i] = lappend(colexprs[i], col);
i++;
}
/* Release sub-list's cells to save memory */
list_free(sublist);
}
/*
* Now resolve the common types of the columns, and coerce everything to
* those types.
* those types. Then identify the common collation, if any, of each
* column.
*
* We must do collation processing now because (1) assign_query_collations
* doesn't process rangetable entries, and (2) we need to label the VALUES
* RTE with column collations for use in the outer query. We don't
* consider conflict of implicit collations to be an error here; instead
* the column will just show InvalidOid as its collation, and you'll get
* a failure later if that results in failure to resolve a collation.
*
* Note we modify the per-column expression lists in-place.
*/
collations = NIL;
for (i = 0; i < sublist_length; i++)
{
coltypes[i] = select_common_type(pstate, colexprs[i], "VALUES", NULL);
}
Oid coltype;
Oid colcoll;
newExprsLists = NIL;
foreach(lc, exprsLists)
{
List *sublist = (List *) lfirst(lc);
List *newsublist = NIL;
coltype = select_common_type(pstate, colexprs[i], "VALUES", NULL);
i = 0;
foreach(lc2, sublist)
foreach(lc, colexprs[i])
{
Node *col = (Node *) lfirst(lc2);
Node *col = (Node *) lfirst(lc);
col = coerce_to_common_type(pstate, col, coltypes[i], "VALUES");
newsublist = lappend(newsublist, col);
i++;
col = coerce_to_common_type(pstate, col, coltype, "VALUES");
lfirst(lc) = (void *) col;
}
/*
* We must assign collations now because assign_query_collations
* doesn't process rangetable entries. We just assign all the
* collations independently in each row, and don't worry about whether
* they are consistent vertically either.
*/
assign_list_collations(pstate, newsublist);
colcoll = select_common_collation(pstate, colexprs[i], true);
newExprsLists = lappend(newExprsLists, newsublist);
collations = lappend_oid(collations, colcoll);
}
/*
* Finally, rearrange the coerced expressions into row-organized lists.
*/
exprsLists = NIL;
foreach(lc, colexprs[0])
{
Node *col = (Node *) lfirst(lc);
List *sublist;
sublist = list_make1(col);
exprsLists = lappend(exprsLists, sublist);
}
list_free(colexprs[0]);
for (i = 1; i < sublist_length; i++)
{
forboth(lc, colexprs[i], lc2, exprsLists)
{
Node *col = (Node *) lfirst(lc);
List *sublist = lfirst(lc2);
/* sublist pointer in exprsLists won't need adjustment */
(void) lappend(sublist, col);
}
list_free(colexprs[i]);
}
/*
* Generate the VALUES RTE
*/
rte = addRangeTableEntryForValues(pstate, newExprsLists, NULL, true);
rte = addRangeTableEntryForValues(pstate, exprsLists, collations,
NULL, true);
rtr = makeNode(RangeTblRef);
/* assume new rte is at end */
rtr->rtindex = list_length(pstate->p_rtable);
@ -1164,7 +1209,7 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain table references"),
parser_errposition(pstate,
locate_var_of_level((Node *) newExprsLists, 0))));
locate_var_of_level((Node *) exprsLists, 0))));
/*
* Another thing we can't currently support is NEW/OLD references in rules
@ -1173,13 +1218,13 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
* This is a shame. FIXME
*/
if (list_length(pstate->p_rtable) != 1 &&
contain_vars_of_level((Node *) newExprsLists, 0))
contain_vars_of_level((Node *) exprsLists, 0))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain OLD or NEW references"),
errhint("Use SELECT ... UNION ALL ... instead."),
parser_errposition(pstate,
locate_var_of_level((Node *) newExprsLists, 0))));
locate_var_of_level((Node *) exprsLists, 0))));
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, NULL);
@ -1191,13 +1236,13 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in VALUES"),
parser_errposition(pstate,
locate_agg_of_level((Node *) newExprsLists, 0))));
locate_agg_of_level((Node *) exprsLists, 0))));
if (pstate->p_hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_WINDOWING_ERROR),
errmsg("cannot use window function in VALUES"),
parser_errposition(pstate,
locate_windowfunc((Node *) newExprsLists))));
locate_windowfunc((Node *) exprsLists))));
assign_query_collations(pstate, qry);

View File

@ -1220,6 +1220,7 @@ addRangeTableEntryForFunction(ParseState *pstate,
RangeTblEntry *
addRangeTableEntryForValues(ParseState *pstate,
List *exprs,
List *collations,
Alias *alias,
bool inFromCl)
{
@ -1233,6 +1234,7 @@ addRangeTableEntryForValues(ParseState *pstate,
rte->relid = InvalidOid;
rte->subquery = NULL;
rte->values_lists = exprs;
rte->values_collations = collations;
rte->alias = alias;
eref = alias ? copyObject(alias) : makeAlias(refname, NIL);
@ -1657,7 +1659,9 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
ListCell *l3;
int attnum = 0;
forthree(l1, rte->funccoltypes, l2, rte->funccoltypmods, l3, rte->funccolcollations)
forthree(l1, rte->funccoltypes,
l2, rte->funccoltypmods,
l3, rte->funccolcollations)
{
Oid attrtype = lfirst_oid(l1);
int32 attrtypmod = lfirst_int(l2);
@ -1687,12 +1691,15 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
{
/* Values RTE */
ListCell *aliasp_item = list_head(rte->eref->colnames);
ListCell *lc;
ListCell *lcv;
ListCell *lcc;
varattno = 0;
foreach(lc, (List *) linitial(rte->values_lists))
forboth(lcv, (List *) linitial(rte->values_lists),
lcc, rte->values_collations)
{
Node *col = (Node *) lfirst(lc);
Node *col = (Node *) lfirst(lcv);
Oid colcollation = lfirst_oid(lcc);
varattno++;
if (colnames)
@ -1712,7 +1719,7 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
varnode = makeVar(rtindex, varattno,
exprType(col),
exprTypmod(col),
exprCollation(col),
colcollation,
sublevels_up);
varnode->location = location;
*colvars = lappend(*colvars, varnode);
@ -1789,7 +1796,9 @@ expandRTE(RangeTblEntry *rte, int rtindex, int sublevels_up,
ListCell *lcc;
varattno = 0;
forthree(lct, rte->ctecoltypes, lcm, rte->ctecoltypmods, lcc, rte->ctecolcollations)
forthree(lct, rte->ctecoltypes,
lcm, rte->ctecoltypmods,
lcc, rte->ctecolcollations)
{
Oid coltype = lfirst_oid(lct);
int32 coltypmod = lfirst_int(lcm);
@ -2116,6 +2125,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
case RTE_VALUES:
{
/* Values RTE --- get type info from first sublist */
/* collation is stored separately, though */
List *collist = (List *) linitial(rte->values_lists);
Node *col;
@ -2125,7 +2135,7 @@ get_rte_attribute_type(RangeTblEntry *rte, AttrNumber attnum,
col = (Node *) list_nth(collist, attnum - 1);
*vartype = exprType(col);
*vartypmod = exprTypmod(col);
*varcollid = exprCollation(col);
*varcollid = list_nth_oid(rte->values_collations, attnum - 1);
}
break;
case RTE_JOIN: