1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-05 07:21:24 +03:00

Add support for multi-row VALUES clauses as part of INSERT statements

(e.g. "INSERT ... VALUES (...), (...), ...") and elsewhere as allowed
by the spec. (e.g. similar to a FROM clause subselect). initdb required.
Joe Conway and Tom Lane.
This commit is contained in:
Joe Conway
2006-08-02 01:59:48 +00:00
parent d307c428cb
commit 9caafda579
40 changed files with 1877 additions and 313 deletions

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.340 2006/07/14 14:52:21 momjian Exp $
* $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.341 2006/08/02 01:59:46 joe Exp $
*
*-------------------------------------------------------------------------
*/
@ -98,10 +98,13 @@ static Query *transformViewStmt(ParseState *pstate, ViewStmt *stmt,
static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt);
static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
List **extras_before, List **extras_after);
static List *transformInsertRow(ParseState *pstate, List *exprlist,
List *stmtcols, List *icolumns, List *attrnos);
static Query *transformIndexStmt(ParseState *pstate, IndexStmt *stmt);
static Query *transformRuleStmt(ParseState *query, RuleStmt *stmt,
List **extras_before, List **extras_after);
static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt);
static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt);
static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt);
static Node *transformSetOperationTree(ParseState *pstate, SelectStmt *stmt);
static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt);
@ -367,12 +370,16 @@ transformStmt(ParseState *pstate, Node *parseTree,
break;
case T_SelectStmt:
if (((SelectStmt *) parseTree)->op == SETOP_NONE)
result = transformSelectStmt(pstate,
(SelectStmt *) parseTree);
else
result = transformSetOperationStmt(pstate,
(SelectStmt *) parseTree);
{
SelectStmt *n = (SelectStmt *) parseTree;
if (n->valuesLists)
result = transformValuesClause(pstate, n);
else if (n->op == SETOP_NONE)
result = transformSelectStmt(pstate, n);
else
result = transformSetOperationStmt(pstate, n);
}
break;
case T_DeclareCursorStmt:
@ -510,19 +517,30 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
List **extras_before, List **extras_after)
{
Query *qry = makeNode(Query);
Query *selectQuery = NULL;
SelectStmt *selectStmt = (SelectStmt *) stmt->selectStmt;
List *exprList = NIL;
bool isGeneralSelect;
List *sub_rtable;
List *sub_relnamespace;
List *sub_varnamespace;
List *icolumns;
List *attrnos;
RangeTblEntry *rte;
RangeTblRef *rtr;
ListCell *icols;
ListCell *attnos;
ListCell *tl;
ListCell *lc;
qry->commandType = CMD_INSERT;
pstate->p_is_insert = true;
/*
* We have three cases to deal with: DEFAULT VALUES (selectStmt == NULL),
* VALUES list, or general SELECT input. We special-case VALUES, both
* for efficiency and so we can handle DEFAULT specifications.
*/
isGeneralSelect = (selectStmt && selectStmt->valuesLists == NIL);
/*
* If a non-nil rangetable/namespace was passed in, and we are doing
* INSERT/SELECT, arrange to pass the rangetable/namespace down to the
@ -532,7 +550,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
* The SELECT's joinlist is not affected however. We must do this before
* adding the target table to the INSERT's rtable.
*/
if (stmt->selectStmt)
if (isGeneralSelect)
{
sub_rtable = pstate->p_rtable;
pstate->p_rtable = NIL;
@ -557,10 +575,23 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
qry->resultRelation = setTargetTable(pstate, stmt->relation,
false, false, ACL_INSERT);
/* Validate stmt->cols list, or build default list if no list given */
icolumns = checkInsertTargets(pstate, stmt->cols, &attrnos);
Assert(list_length(icolumns) == list_length(attrnos));
/*
* Is it INSERT ... SELECT or INSERT ... VALUES?
* Determine which variant of INSERT we have.
*/
if (stmt->selectStmt)
if (selectStmt == NULL)
{
/*
* We have INSERT ... DEFAULT VALUES. We can handle this case by
* emitting an empty targetlist --- all columns will be defaulted
* when the planner expands the targetlist.
*/
exprList = NIL;
}
else if (isGeneralSelect)
{
/*
* We make the sub-pstate a child of the outer pstate so that it can
@ -570,8 +601,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
* see.
*/
ParseState *sub_pstate = make_parsestate(pstate);
RangeTblEntry *rte;
RangeTblRef *rtr;
Query *selectQuery;
/*
* Process the source SELECT.
@ -617,8 +647,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
pstate->p_joinlist = lappend(pstate->p_joinlist, rtr);
/*----------
* Generate a targetlist for the INSERT that selects all the
* non-resjunk columns from the subquery. (We need this to be
* Generate an expression list for the INSERT that selects all the
* non-resjunk columns from the subquery. (INSERT's tlist must be
* separate from the subquery's tlist because we may add columns,
* insert datatype coercions, etc.)
*
@ -630,10 +660,10 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
* INSERT INTO foo SELECT 'bar', ... FROM baz
*----------
*/
qry->targetList = NIL;
foreach(tl, selectQuery->targetList)
exprList = NIL;
foreach(lc, selectQuery->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(tl);
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Expr *expr;
if (tle->resjunk)
@ -648,82 +678,220 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr),
0);
tle = makeTargetEntry(expr,
(AttrNumber) pstate->p_next_resno++,
tle->resname,
false);
qry->targetList = lappend(qry->targetList, tle);
exprList = lappend(exprList, expr);
}
/* Prepare row for assignment to target table */
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos);
}
else if (list_length(selectStmt->valuesLists) > 1)
{
/*
* Process INSERT ... VALUES with multiple VALUES sublists.
* We generate a VALUES RTE holding the transformed expression
* lists, and build up a targetlist containing Vars that reference
* the VALUES RTE.
*/
List *exprsLists = NIL;
int sublist_length = -1;
foreach(lc, selectStmt->valuesLists)
{
List *sublist = (List *) lfirst(lc);
/* Do basic expression transformation (same as a ROW() expr) */
sublist = transformExpressionList(pstate, sublist);
/*
* All the sublists must be the same length, *after* transformation
* (which might expand '*' into multiple items). The VALUES RTE
* can't handle anything different.
*/
if (sublist_length < 0)
{
/* Remember post-transformation length of first sublist */
sublist_length = list_length(sublist);
}
else if (sublist_length != list_length(sublist))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("VALUES lists must all be the same length")));
}
/* Prepare row for assignment to target table */
sublist = transformInsertRow(pstate, sublist,
stmt->cols,
icolumns, attrnos);
exprsLists = lappend(exprsLists, sublist);
}
/*
* There mustn't have been any table references in the expressions,
* else strange things would happen, like Cartesian products of
* those tables with the VALUES list ...
*/
if (pstate->p_joinlist != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain table references")));
/*
* Another thing we can't currently support is NEW/OLD references
* in rules --- seems we'd need something like SQL99's LATERAL
* construct to ensure that the values would be available while
* evaluating the VALUES RTE. This is a shame. FIXME
*/
if (contain_vars_of_level((Node *) exprsLists, 0))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain OLD or NEW references")));
/*
* Generate the VALUES RTE
*/
rte = addRangeTableEntryForValues(pstate, exprsLists, NULL, true);
rtr = makeNode(RangeTblRef);
/* assume new rte is at end */
rtr->rtindex = list_length(pstate->p_rtable);
Assert(rte == rt_fetch(rtr->rtindex, pstate->p_rtable));
pstate->p_joinlist = lappend(pstate->p_joinlist, rtr);
/*
* Generate list of Vars referencing the RTE
*/
expandRTE(rte, rtr->rtindex, 0, false, NULL, &exprList);
}
else
{
/*
* For INSERT ... VALUES, transform the given list of values to form a
* targetlist for the INSERT.
/*----------
* Process INSERT ... VALUES with a single VALUES sublist.
* We treat this separately for efficiency and for historical
* compatibility --- specifically, allowing table references,
* such as
* INSERT INTO foo VALUES(bar.*)
*
* The sublist is just computed directly as the Query's targetlist,
* with no VALUES RTE. So it works just like SELECT without FROM.
*----------
*/
qry->targetList = transformTargetList(pstate, stmt->targetList);
List *valuesLists = selectStmt->valuesLists;
Assert(list_length(valuesLists) == 1);
/* Do basic expression transformation (same as a ROW() expr) */
exprList = transformExpressionList(pstate,
(List *) linitial(valuesLists));
/* Prepare row for assignment to target table */
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos);
}
/*
* Now we are done with SELECT-like processing, and can get on with
* transforming the target list to match the INSERT target columns.
*/
/* Prepare to assign non-conflicting resnos to resjunk attributes */
if (pstate->p_next_resno <= pstate->p_target_relation->rd_rel->relnatts)
pstate->p_next_resno = pstate->p_target_relation->rd_rel->relnatts + 1;
/* Validate stmt->cols list, or build default list if no list given */
icolumns = checkInsertTargets(pstate, stmt->cols, &attrnos);
/*
* Prepare columns for assignment to target table.
* Generate query's target list using the computed list of expressions.
*/
qry->targetList = NIL;
icols = list_head(icolumns);
attnos = list_head(attrnos);
foreach(tl, qry->targetList)
foreach(lc, exprList)
{
TargetEntry *tle = (TargetEntry *) lfirst(tl);
Expr *expr = (Expr *) lfirst(lc);
ResTarget *col;
if (icols == NULL || attnos == NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("INSERT has more expressions than target columns")));
TargetEntry *tle;
col = (ResTarget *) lfirst(icols);
Assert(IsA(col, ResTarget));
Assert(!tle->resjunk);
updateTargetListEntry(pstate, tle, col->name, lfirst_int(attnos),
col->indirection, col->location);
tle = makeTargetEntry(expr,
(AttrNumber) lfirst_int(attnos),
col->name,
false);
qry->targetList = lappend(qry->targetList, tle);
icols = lnext(icols);
attnos = lnext(attnos);
}
/*
* Ensure that the targetlist has the same number of entries that were
* present in the columns list. Don't do the check unless an explicit
* columns list was given, though.
*/
if (stmt->cols != NIL && (icols != NULL || attnos != NULL))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("INSERT has more target columns than expressions")));
/* done building the range table and jointree */
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, NULL);
qry->hasSubLinks = pstate->p_hasSubLinks;
qry->hasAggs = pstate->p_hasAggs;
/* aggregates not allowed (but subselects are okay) */
if (pstate->p_hasAggs)
parseCheckAggregates(pstate, qry);
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in VALUES")));
return qry;
}
/*
* Prepare an INSERT row for assignment to the target table.
*
* The row might be either a VALUES row, or variables referencing a
* sub-SELECT output.
*/
static List *
transformInsertRow(ParseState *pstate, List *exprlist,
List *stmtcols, List *icolumns, List *attrnos)
{
List *result;
ListCell *lc;
ListCell *icols;
ListCell *attnos;
/*
* Check length of expr list. It must not have more expressions than
* there are target columns. We allow fewer, but only if no explicit
* columns list was given (the remaining columns are implicitly
* defaulted). Note we must check this *after* transformation because
* that could expand '*' into multiple items.
*/
if (list_length(exprlist) > list_length(icolumns))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("INSERT has more expressions than target columns")));
if (stmtcols != NIL &&
list_length(exprlist) < list_length(icolumns))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("INSERT has more target columns than expressions")));
/*
* Prepare columns for assignment to target table.
*/
result = NIL;
icols = list_head(icolumns);
attnos = list_head(attrnos);
foreach(lc, exprlist)
{
Expr *expr = (Expr *) lfirst(lc);
ResTarget *col;
col = (ResTarget *) lfirst(icols);
Assert(IsA(col, ResTarget));
expr = transformAssignedExpr(pstate, expr,
col->name,
lfirst_int(attnos),
col->indirection,
col->location);
result = lappend(result, expr);
icols = lnext(icols);
attnos = lnext(attnos);
}
return result;
}
/*
* transformCreateStmt -
* transforms the "create table" statement
@ -1933,6 +2101,187 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt)
return qry;
}
/*
* transformValuesClause -
* transforms a VALUES clause that's being used as a standalone SELECT
*
* We build a Query containing a VALUES RTE, rather as if one had written
* SELECT * FROM (VALUES ...)
*/
static Query *
transformValuesClause(ParseState *pstate, SelectStmt *stmt)
{
Query *qry = makeNode(Query);
List *exprsLists = NIL;
List **coltype_lists = NULL;
Oid *coltypes = NULL;
int sublist_length = -1;
List *newExprsLists;
RangeTblEntry *rte;
RangeTblRef *rtr;
ListCell *lc;
ListCell *lc2;
int i;
qry->commandType = CMD_SELECT;
/* Most SELECT stuff doesn't apply in a VALUES clause */
Assert(stmt->distinctClause == NIL);
Assert(stmt->into == NULL);
Assert(stmt->intoColNames == NIL);
Assert(stmt->targetList == NIL);
Assert(stmt->fromClause == NIL);
Assert(stmt->whereClause == NULL);
Assert(stmt->groupClause == NIL);
Assert(stmt->havingClause == NULL);
Assert(stmt->op == SETOP_NONE);
/*
* For each row of VALUES, transform the raw expressions and gather
* type information. This is also a handy place to reject DEFAULT
* nodes, which the grammar allows for simplicity.
*/
foreach(lc, stmt->valuesLists)
{
List *sublist = (List *) lfirst(lc);
/* Do basic expression transformation (same as a ROW() expr) */
sublist = transformExpressionList(pstate, sublist);
/*
* All the sublists must be the same length, *after* transformation
* (which might expand '*' into multiple items). The VALUES RTE
* can't handle anything different.
*/
if (sublist_length < 0)
{
/* Remember post-transformation length of first sublist */
sublist_length = list_length(sublist);
/* and allocate arrays for column-type info */
coltype_lists = (List **) palloc0(sublist_length * sizeof(List *));
coltypes = (Oid *) palloc0(sublist_length * sizeof(Oid));
}
else if (sublist_length != list_length(sublist))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("VALUES lists must all be the same length")));
}
exprsLists = lappend(exprsLists, sublist);
i = 0;
foreach(lc2, sublist)
{
Node *col = (Node *) lfirst(lc2);
if (IsA(col, SetToDefault))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("DEFAULT can only appear in a VALUES list within INSERT")));
coltype_lists[i] = lappend_oid(coltype_lists[i], exprType(col));
i++;
}
}
/*
* Now resolve the common types of the columns, and coerce everything
* to those types.
*/
for (i = 0; i < sublist_length; i++)
{
coltypes[i] = select_common_type(coltype_lists[i], "VALUES");
}
newExprsLists = NIL;
foreach(lc, exprsLists)
{
List *sublist = (List *) lfirst(lc);
List *newsublist = NIL;
i = 0;
foreach(lc2, sublist)
{
Node *col = (Node *) lfirst(lc2);
col = coerce_to_common_type(pstate, col, coltypes[i], "VALUES");
newsublist = lappend(newsublist, col);
i++;
}
newExprsLists = lappend(newExprsLists, newsublist);
}
/*
* Generate the VALUES RTE
*/
rte = addRangeTableEntryForValues(pstate, newExprsLists, NULL, true);
rtr = makeNode(RangeTblRef);
/* assume new rte is at end */
rtr->rtindex = list_length(pstate->p_rtable);
Assert(rte == rt_fetch(rtr->rtindex, pstate->p_rtable));
pstate->p_joinlist = lappend(pstate->p_joinlist, rtr);
/*
* Generate a targetlist as though expanding "*"
*/
Assert(pstate->p_next_resno == 1);
qry->targetList = expandRelAttrs(pstate, rte, rtr->rtindex, 0);
/*
* The grammar does allow attaching ORDER BY, LIMIT, and FOR UPDATE
* to a VALUES, so cope.
*/
qry->sortClause = transformSortClause(pstate,
stmt->sortClause,
&qry->targetList,
true /* fix unknowns */ );
qry->limitOffset = transformLimitClause(pstate, stmt->limitOffset,
"OFFSET");
qry->limitCount = transformLimitClause(pstate, stmt->limitCount,
"LIMIT");
if (stmt->lockingClause)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SELECT FOR UPDATE/SHARE cannot be applied to VALUES")));
/*
* There mustn't have been any table references in the expressions,
* else strange things would happen, like Cartesian products of
* those tables with the VALUES list. We have to check this after
* parsing ORDER BY et al since those could insert more junk.
*/
if (list_length(pstate->p_joinlist) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain table references")));
/*
* Another thing we can't currently support is NEW/OLD references
* in rules --- seems we'd need something like SQL99's LATERAL
* construct to ensure that the values would be available while
* evaluating the VALUES RTE. This is a shame. FIXME
*/
if (contain_vars_of_level((Node *) newExprsLists, 0))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VALUES must not contain OLD or NEW references")));
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, NULL);
qry->hasSubLinks = pstate->p_hasSubLinks;
/* aggregates not allowed (but subselects are okay) */
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in VALUES")));
return qry;
}
/*
* transformSetOperationsStmt -
* transforms a set-operations tree
@ -2931,6 +3280,11 @@ transformLockingClause(Query *qry, LockingClause *lc)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SELECT FOR UPDATE/SHARE cannot be applied to a function")));
break;
case RTE_VALUES:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SELECT FOR UPDATE/SHARE cannot be applied to VALUES")));
break;
default:
elog(ERROR, "unrecognized RTE type: %d",
(int) rte->rtekind);