1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Restructure SELECT INTO's parsetree representation into CreateTableAsStmt.

Making this operation look like a utility statement seems generally a good
idea, and particularly so in light of the desire to provide command
triggers for utility statements.  The original choice of representing it as
SELECT with an IntoClause appendage had metastasized into rather a lot of
places, unfortunately, so that this patch is a great deal more complicated
than one might at first expect.

In particular, keeping EXPLAIN working for SELECT INTO and CREATE TABLE AS
subcommands required restructuring some EXPLAIN-related APIs.  Add-on code
that calls ExplainOnePlan or ExplainOneUtility, or uses
ExplainOneQuery_hook, will need adjustment.

Also, the cases PREPARE ... SELECT INTO and CREATE RULE ... SELECT INTO,
which formerly were accepted though undocumented, are no longer accepted.
The PREPARE case can be replaced with use of CREATE TABLE AS EXECUTE.
The CREATE RULE case doesn't seem to have much real-world use (since the
rule would work only once before failing with "table already exists"),
so we'll not bother with that one.

Both SELECT INTO and CREATE TABLE AS still return a command tag of
"SELECT nnnn".  There was some discussion of returning "CREATE TABLE nnnn",
but for the moment backwards compatibility wins the day.

Andres Freund and Tom Lane
This commit is contained in:
Tom Lane
2012-03-19 21:37:19 -04:00
parent 77503a7638
commit 9dbf2b7d75
47 changed files with 963 additions and 729 deletions

View File

@@ -37,27 +37,20 @@
*/
#include "postgres.h"
#include "access/reloptions.h"
#include "access/sysattr.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
#include "catalog/toasting.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/execdebug.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@@ -90,12 +83,6 @@ static char *ExecBuildSlotValueDescription(TupleTableSlot *slot,
int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
Plan *planTree);
static void OpenIntoRel(QueryDesc *queryDesc);
static void CloseIntoRel(QueryDesc *queryDesc);
static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
static void intorel_shutdown(DestReceiver *self);
static void intorel_destroy(DestReceiver *self);
/* end of local decls */
@@ -174,11 +161,9 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
case CMD_SELECT:
/*
* SELECT INTO, SELECT FOR UPDATE/SHARE and modifying CTEs need to
* mark tuples
* SELECT FOR UPDATE/SHARE and modifying CTEs need to mark tuples
*/
if (queryDesc->plannedstmt->intoClause != NULL ||
queryDesc->plannedstmt->rowMarks != NIL ||
if (queryDesc->plannedstmt->rowMarks != NIL ||
queryDesc->plannedstmt->hasModifyingCTE)
estate->es_output_cid = GetCurrentCommandId(true);
@@ -309,13 +294,6 @@ standard_ExecutorRun(QueryDesc *queryDesc,
if (sendTuples)
(*dest->rStartup) (dest, operation, queryDesc->tupDesc);
/*
* if it's CREATE TABLE AS ... WITH NO DATA, skip plan execution
*/
if (estate->es_select_into &&
queryDesc->plannedstmt->intoClause->skipData)
direction = NoMovementScanDirection;
/*
* run plan
*/
@@ -451,12 +429,6 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
ExecEndPlan(queryDesc->planstate, estate);
/*
* Close the SELECT INTO relation if any
*/
if (estate->es_select_into)
CloseIntoRel(queryDesc);
/* do away with our snapshots */
UnregisterSnapshot(estate->es_snapshot);
UnregisterSnapshot(estate->es_crosscheck_snapshot);
@@ -706,15 +678,6 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
{
ListCell *l;
/*
* CREATE TABLE AS or SELECT INTO?
*
* XXX should we allow this if the destination is temp? Considering that
* it would still require catalog changes, probably not.
*/
if (plannedstmt->intoClause != NULL)
PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt));
/* Fail if write permissions are requested on any non-temp table */
foreach(l, plannedstmt->rtable)
{
@@ -863,18 +826,6 @@ InitPlan(QueryDesc *queryDesc, int eflags)
estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
}
/*
* Detect whether we're doing SELECT INTO. If so, set the es_into_oids
* flag appropriately so that the plan tree will be initialized with the
* correct tuple descriptors. (Other SELECT INTO stuff comes later.)
*/
estate->es_select_into = false;
if (operation == CMD_SELECT && plannedstmt->intoClause != NULL)
{
estate->es_select_into = true;
estate->es_into_oids = interpretOidsOption(plannedstmt->intoClause->options);
}
/*
* Initialize the executor's tuple table to empty.
*/
@@ -926,9 +877,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
planstate = ExecInitNode(plan, estate, eflags);
/*
* Get the tuple descriptor describing the type of tuples to return. (this
* is especially important if we are creating a relation with "SELECT
* INTO")
* Get the tuple descriptor describing the type of tuples to return.
*/
tupType = ExecGetResultType(planstate);
@@ -968,16 +917,6 @@ InitPlan(QueryDesc *queryDesc, int eflags)
queryDesc->tupDesc = tupType;
queryDesc->planstate = planstate;
/*
* If doing SELECT INTO, initialize the "into" relation. We must wait
* till now so we have the "clean" result tuple type to create the new
* table from.
*
* If EXPLAIN, skip creating the "into" relation.
*/
if (estate->es_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
OpenIntoRel(queryDesc);
}
/*
@@ -1230,7 +1169,7 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
/*
* ExecContextForcesOids
*
* This is pretty grotty: when doing INSERT, UPDATE, or SELECT INTO,
* This is pretty grotty: when doing INSERT, UPDATE, or CREATE TABLE AS,
* we need to ensure that result tuples have space for an OID iff they are
* going to be stored into a relation that has OIDs. In other contexts
* we are free to choose whether to leave space for OIDs in result tuples
@@ -1255,9 +1194,9 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
* the ModifyTable node, so ModifyTable has to set es_result_relation_info
* while initializing each subplan.
*
* SELECT INTO is even uglier, because we don't have the INTO relation's
* descriptor available when this code runs; we have to look aside at a
* flag set by InitPlan().
* CREATE TABLE AS is even uglier, because we don't have the target relation's
* descriptor available when this code runs; we have to look aside at the
* flags passed to ExecutorStart().
*/
bool
ExecContextForcesOids(PlanState *planstate, bool *hasoids)
@@ -1275,9 +1214,14 @@ ExecContextForcesOids(PlanState *planstate, bool *hasoids)
}
}
if (planstate->state->es_select_into)
if (planstate->state->es_top_eflags & EXEC_FLAG_WITH_OIDS)
{
*hasoids = planstate->state->es_into_oids;
*hasoids = true;
return true;
}
if (planstate->state->es_top_eflags & EXEC_FLAG_WITHOUT_OIDS)
{
*hasoids = false;
return true;
}
@@ -2290,8 +2234,6 @@ EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree)
estate->es_rowMarks = parentestate->es_rowMarks;
estate->es_top_eflags = parentestate->es_top_eflags;
estate->es_instrument = parentestate->es_instrument;
estate->es_select_into = parentestate->es_select_into;
estate->es_into_oids = parentestate->es_into_oids;
/* es_auxmodifytables must NOT be copied */
/*
@@ -2423,351 +2365,3 @@ EvalPlanQualEnd(EPQState *epqstate)
epqstate->planstate = NULL;
epqstate->origslot = NULL;
}
/*
* Support for SELECT INTO (a/k/a CREATE TABLE AS)
*
* We implement SELECT INTO by diverting SELECT's normal output with
* a specialized DestReceiver type.
*/
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
EState *estate; /* EState we are working with */
DestReceiver *origdest; /* QueryDesc's original receiver */
Relation rel; /* Relation to write to */
int hi_options; /* heap_insert performance options */
BulkInsertState bistate; /* bulk insert state */
} DR_intorel;
/*
* OpenIntoRel --- actually create the SELECT INTO target relation
*
* This also replaces QueryDesc->dest with the special DestReceiver for
* SELECT INTO. We assume that the correct result tuple type has already
* been placed in queryDesc->tupDesc.
*/
static void
OpenIntoRel(QueryDesc *queryDesc)
{
IntoClause *into = queryDesc->plannedstmt->intoClause;
EState *estate = queryDesc->estate;
TupleDesc intoTupDesc = queryDesc->tupDesc;
Relation intoRelationDesc;
char *intoName;
Oid namespaceId;
Oid tablespaceId;
Datum reloptions;
Oid intoRelationId;
DR_intorel *myState;
RangeTblEntry *rte;
AttrNumber attnum;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
Assert(into);
/*
* XXX This code needs to be kept in sync with DefineRelation(). Maybe we
* should try to use that function instead.
*/
/*
* Check consistency of arguments
*/
if (into->onCommit != ONCOMMIT_NOOP
&& into->rel->relpersistence != RELPERSISTENCE_TEMP)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("ON COMMIT can only be used on temporary tables")));
{
AclResult aclresult;
int i;
for (i = 0; i < intoTupDesc->natts; i++)
{
Oid atttypid = intoTupDesc->attrs[i]->atttypid;
aclresult = pg_type_aclcheck(atttypid, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_TYPE,
format_type_be(atttypid));
}
}
/*
* If a column name list was specified in CREATE TABLE AS, override the
* column names derived from the query. (Too few column names are OK, too
* many are not.) It would probably be all right to scribble directly on
* the query's result tupdesc, but let's be safe and make a copy.
*/
if (into->colNames)
{
ListCell *lc;
intoTupDesc = CreateTupleDescCopy(intoTupDesc);
attnum = 1;
foreach(lc, into->colNames)
{
char *colname = strVal(lfirst(lc));
if (attnum > intoTupDesc->natts)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("CREATE TABLE AS specifies too many column names")));
namestrcpy(&(intoTupDesc->attrs[attnum - 1]->attname), colname);
attnum++;
}
}
/*
* Find namespace to create in, check its permissions, lock it against
* concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the
* selected namespace is temporary.
*/
intoName = into->rel->relname;
namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock,
NULL);
/*
* Security check: disallow creating temp tables from security-restricted
* code. This is needed because calling code might not expect untrusted
* tables to appear in pg_temp at the front of its search path.
*/
if (into->rel->relpersistence == RELPERSISTENCE_TEMP
&& InSecurityRestrictedOperation())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("cannot create temporary table within security-restricted operation")));
/*
* Select tablespace to use. If not specified, use default tablespace
* (which may in turn default to database's default).
*/
if (into->tableSpaceName)
{
tablespaceId = get_tablespace_oid(into->tableSpaceName, false);
}
else
{
tablespaceId = GetDefaultTablespace(into->rel->relpersistence);
/* note InvalidOid is OK in this case */
}
/* Check permissions except when using the database's default space */
if (OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
{
AclResult aclresult;
aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
get_tablespace_name(tablespaceId));
}
/* Parse and validate any reloptions */
reloptions = transformRelOptions((Datum) 0,
into->options,
NULL,
validnsps,
true,
false);
(void) heap_reloptions(RELKIND_RELATION, reloptions, true);
/* Now we can actually create the new relation */
intoRelationId = heap_create_with_catalog(intoName,
namespaceId,
tablespaceId,
InvalidOid,
InvalidOid,
InvalidOid,
GetUserId(),
intoTupDesc,
NIL,
RELKIND_RELATION,
into->rel->relpersistence,
false,
false,
true,
0,
into->onCommit,
reloptions,
true,
allowSystemTableMods);
Assert(intoRelationId != InvalidOid);
/*
* Advance command counter so that the newly-created relation's catalog
* tuples will be visible to heap_open.
*/
CommandCounterIncrement();
/*
* If necessary, create a TOAST table for the INTO relation. Note that
* AlterTableCreateToastTable ends with CommandCounterIncrement(), so that
* the TOAST table will be visible for insertion.
*/
reloptions = transformRelOptions((Datum) 0,
into->options,
"toast",
validnsps,
true,
false);
(void) heap_reloptions(RELKIND_TOASTVALUE, reloptions, true);
AlterTableCreateToastTable(intoRelationId, reloptions);
/*
* And open the constructed table for writing.
*/
intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
/*
* Check INSERT permission on the constructed table.
*/
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = intoRelationId;
rte->relkind = RELKIND_RELATION;
rte->requiredPerms = ACL_INSERT;
for (attnum = 1; attnum <= intoTupDesc->natts; attnum++)
rte->modifiedCols = bms_add_member(rte->modifiedCols,
attnum - FirstLowInvalidHeapAttributeNumber);
ExecCheckRTPerms(list_make1(rte), true);
/*
* Now replace the query's DestReceiver with one for SELECT INTO
*/
myState = (DR_intorel *) CreateDestReceiver(DestIntoRel);
Assert(myState->pub.mydest == DestIntoRel);
myState->estate = estate;
myState->origdest = queryDesc->dest;
myState->rel = intoRelationDesc;
queryDesc->dest = (DestReceiver *) myState;
/*
* We can skip WAL-logging the insertions, unless PITR or streaming
* replication is in use. We can skip the FSM in any case.
*/
myState->hi_options = HEAP_INSERT_SKIP_FSM |
(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
myState->bistate = GetBulkInsertState();
/* Not using WAL requires smgr_targblock be initially invalid */
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
/*
* CloseIntoRel --- clean up SELECT INTO at ExecutorEnd time
*/
static void
CloseIntoRel(QueryDesc *queryDesc)
{
DR_intorel *myState = (DR_intorel *) queryDesc->dest;
/*
* OpenIntoRel might never have gotten called, and we also want to guard
* against double destruction.
*/
if (myState && myState->pub.mydest == DestIntoRel)
{
FreeBulkInsertState(myState->bistate);
/* If we skipped using WAL, must heap_sync before commit */
if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
heap_sync(myState->rel);
/* close rel, but keep lock until commit */
heap_close(myState->rel, NoLock);
/* restore the receiver belonging to executor's caller */
queryDesc->dest = myState->origdest;
/* might as well invoke my destructor */
intorel_destroy((DestReceiver *) myState);
}
}
/*
* CreateIntoRelDestReceiver -- create a suitable DestReceiver object
*/
DestReceiver *
CreateIntoRelDestReceiver(void)
{
DR_intorel *self = (DR_intorel *) palloc0(sizeof(DR_intorel));
self->pub.receiveSlot = intorel_receive;
self->pub.rStartup = intorel_startup;
self->pub.rShutdown = intorel_shutdown;
self->pub.rDestroy = intorel_destroy;
self->pub.mydest = DestIntoRel;
/* private fields will be set by OpenIntoRel */
return (DestReceiver *) self;
}
/*
* intorel_startup --- executor startup
*/
static void
intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* no-op */
}
/*
* intorel_receive --- receive one tuple
*/
static void
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
HeapTuple tuple;
/*
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
*/
tuple = ExecMaterializeSlot(slot);
/*
* force assignment of new OID (see comments in ExecInsert)
*/
if (myState->rel->rd_rel->relhasoids)
HeapTupleSetOid(tuple, InvalidOid);
heap_insert(myState->rel,
tuple,
myState->estate->es_output_cid,
myState->hi_options,
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
}
/*
* intorel_shutdown --- executor end
*/
static void
intorel_shutdown(DestReceiver *self)
{
/* no-op */
}
/*
* intorel_destroy --- release DestReceiver object
*/
static void
intorel_destroy(DestReceiver *self)
{
pfree(self);
}