mirror of
https://github.com/postgres/postgres.git
synced 2025-06-30 21:42:05 +03:00
Add INSERT/UPDATE/DELETE RETURNING, with basic docs and regression tests.
plpgsql support to come later. Along the way, convert execMain's SELECT INTO support into a DestReceiver, in order to eliminate some ugly special cases. Jonah Harris and Tom Lane
This commit is contained in:
@ -26,7 +26,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.277 2006/07/31 01:16:37 tgl Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.278 2006/08/12 02:52:04 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -43,6 +43,7 @@
|
||||
#include "commands/trigger.h"
|
||||
#include "executor/execdebug.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "executor/nodeSubplan.h"
|
||||
#include "miscadmin.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "parser/parse_clause.h"
|
||||
@ -75,14 +76,20 @@ static TupleTableSlot *ExecutePlan(EState *estate, PlanState *planstate,
|
||||
ScanDirection direction,
|
||||
DestReceiver *dest);
|
||||
static void ExecSelect(TupleTableSlot *slot,
|
||||
DestReceiver *dest,
|
||||
EState *estate);
|
||||
DestReceiver *dest, EState *estate);
|
||||
static void ExecInsert(TupleTableSlot *slot, ItemPointer tupleid,
|
||||
EState *estate);
|
||||
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
|
||||
EState *estate);
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest, EState *estate);
|
||||
static void ExecDelete(ItemPointer tupleid,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest, EState *estate);
|
||||
static void ExecUpdate(TupleTableSlot *slot, ItemPointer tupleid,
|
||||
EState *estate);
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest, EState *estate);
|
||||
static void ExecProcessReturning(ProjectionInfo *projectReturning,
|
||||
TupleTableSlot *tupleSlot,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest);
|
||||
static TupleTableSlot *EvalPlanQualNext(EState *estate);
|
||||
static void EndEvalPlanQual(EState *estate);
|
||||
static void ExecCheckRTEPerms(RangeTblEntry *rte);
|
||||
@ -90,6 +97,12 @@ static void ExecCheckXactReadOnly(Query *parsetree);
|
||||
static void EvalPlanQualStart(evalPlanQual *epq, EState *estate,
|
||||
evalPlanQual *priorepq);
|
||||
static void EvalPlanQualStop(evalPlanQual *epq);
|
||||
static void OpenIntoRel(QueryDesc *queryDesc);
|
||||
static void CloseIntoRel(QueryDesc *queryDesc);
|
||||
static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
|
||||
static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
|
||||
static void intorel_shutdown(DestReceiver *self);
|
||||
static void intorel_destroy(DestReceiver *self);
|
||||
|
||||
/* end of local decls */
|
||||
|
||||
@ -185,6 +198,7 @@ ExecutorRun(QueryDesc *queryDesc,
|
||||
EState *estate;
|
||||
CmdType operation;
|
||||
DestReceiver *dest;
|
||||
bool sendTuples;
|
||||
TupleTableSlot *result;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
@ -207,12 +221,16 @@ ExecutorRun(QueryDesc *queryDesc,
|
||||
dest = queryDesc->dest;
|
||||
|
||||
/*
|
||||
* startup tuple receiver
|
||||
* startup tuple receiver, if we will be emitting tuples
|
||||
*/
|
||||
estate->es_processed = 0;
|
||||
estate->es_lastoid = InvalidOid;
|
||||
|
||||
(*dest->rStartup) (dest, operation, queryDesc->tupDesc);
|
||||
sendTuples = (operation == CMD_SELECT ||
|
||||
queryDesc->parsetree->returningList);
|
||||
|
||||
if (sendTuples)
|
||||
(*dest->rStartup) (dest, operation, queryDesc->tupDesc);
|
||||
|
||||
/*
|
||||
* run plan
|
||||
@ -228,9 +246,10 @@ ExecutorRun(QueryDesc *queryDesc,
|
||||
dest);
|
||||
|
||||
/*
|
||||
* shutdown receiver
|
||||
* shutdown tuple receiver, if we started it
|
||||
*/
|
||||
(*dest->rShutdown) (dest);
|
||||
if (sendTuples)
|
||||
(*dest->rShutdown) (dest);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
@ -264,6 +283,12 @@ ExecutorEnd(QueryDesc *queryDesc)
|
||||
|
||||
ExecEndPlan(queryDesc->planstate, estate);
|
||||
|
||||
/*
|
||||
* Close the SELECT INTO relation if any
|
||||
*/
|
||||
if (estate->es_select_into)
|
||||
CloseIntoRel(queryDesc);
|
||||
|
||||
/*
|
||||
* Must switch out of context before destroying it
|
||||
*/
|
||||
@ -449,8 +474,6 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
EState *estate = queryDesc->estate;
|
||||
PlanState *planstate;
|
||||
List *rangeTable;
|
||||
Relation intoRelationDesc;
|
||||
bool do_select_into;
|
||||
TupleDesc tupType;
|
||||
ListCell *l;
|
||||
|
||||
@ -534,13 +557,11 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
/*
|
||||
* Detect whether we're doing SELECT INTO. If so, set the es_into_oids
|
||||
* flag appropriately so that the plan tree will be initialized with the
|
||||
* correct tuple descriptors.
|
||||
* correct tuple descriptors. (Other SELECT INTO stuff comes later.)
|
||||
*/
|
||||
do_select_into = false;
|
||||
|
||||
estate->es_select_into = false;
|
||||
if (operation == CMD_SELECT && parseTree->into != NULL)
|
||||
{
|
||||
do_select_into = true;
|
||||
estate->es_select_into = true;
|
||||
estate->es_into_oids = interpretOidsOption(parseTree->intoOptions);
|
||||
}
|
||||
@ -581,7 +602,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
else
|
||||
nSlots += 1;
|
||||
if (operation != CMD_SELECT)
|
||||
nSlots++;
|
||||
nSlots++; /* for es_trig_tuple_slot */
|
||||
if (parseTree->returningLists)
|
||||
nSlots++; /* for RETURNING projection */
|
||||
|
||||
estate->es_tupleTable = ExecCreateTupleTable(nSlots);
|
||||
|
||||
@ -638,7 +661,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
}
|
||||
}
|
||||
if (!junk_filter_needed &&
|
||||
(operation == CMD_INSERT || do_select_into) &&
|
||||
(operation == CMD_INSERT || estate->es_select_into) &&
|
||||
ExecMayReturnRawTuples(planstate))
|
||||
junk_filter_needed = true;
|
||||
break;
|
||||
@ -712,6 +735,62 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
estate->es_junkFilter = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize RETURNING projections if needed.
|
||||
*/
|
||||
if (parseTree->returningLists)
|
||||
{
|
||||
TupleTableSlot *slot;
|
||||
ExprContext *econtext;
|
||||
ResultRelInfo *resultRelInfo;
|
||||
|
||||
/*
|
||||
* We set QueryDesc.tupDesc to be the RETURNING rowtype in this case.
|
||||
* We assume all the sublists will generate the same output tupdesc.
|
||||
*/
|
||||
tupType = ExecTypeFromTL((List *) linitial(parseTree->returningLists),
|
||||
false);
|
||||
|
||||
/* Set up a slot for the output of the RETURNING projection(s) */
|
||||
slot = ExecAllocTableSlot(estate->es_tupleTable);
|
||||
ExecSetSlotDescriptor(slot, tupType);
|
||||
/* Need an econtext too */
|
||||
econtext = CreateExprContext(estate);
|
||||
|
||||
/*
|
||||
* Build a projection for each result rel. Note that any SubPlans
|
||||
* in the RETURNING lists get attached to the topmost plan node.
|
||||
*/
|
||||
Assert(list_length(parseTree->returningLists) == estate->es_num_result_relations);
|
||||
resultRelInfo = estate->es_result_relations;
|
||||
foreach(l, parseTree->returningLists)
|
||||
{
|
||||
List *rlist = (List *) lfirst(l);
|
||||
List *rliststate;
|
||||
|
||||
rliststate = (List *) ExecInitExpr((Expr *) rlist, planstate);
|
||||
resultRelInfo->ri_projectReturning =
|
||||
ExecBuildProjectionInfo(rliststate, econtext, slot);
|
||||
resultRelInfo++;
|
||||
}
|
||||
/*
|
||||
* Because we already ran ExecInitNode() for the top plan node,
|
||||
* any subplans we just attached to it won't have been initialized;
|
||||
* so we have to do it here. (Ugly, but the alternatives seem worse.)
|
||||
*/
|
||||
foreach(l, planstate->subPlan)
|
||||
{
|
||||
SubPlanState *sstate = (SubPlanState *) lfirst(l);
|
||||
|
||||
Assert(IsA(sstate, SubPlanState));
|
||||
if (sstate->planstate == NULL) /* already inited? */
|
||||
ExecInitSubPlan(sstate, estate, eflags);
|
||||
}
|
||||
}
|
||||
|
||||
queryDesc->tupDesc = tupType;
|
||||
queryDesc->planstate = planstate;
|
||||
|
||||
/*
|
||||
* If doing SELECT INTO, initialize the "into" relation. We must wait
|
||||
* till now so we have the "clean" result tuple type to create the new
|
||||
@ -719,134 +798,8 @@ InitPlan(QueryDesc *queryDesc, int eflags)
|
||||
*
|
||||
* If EXPLAIN, skip creating the "into" relation.
|
||||
*/
|
||||
intoRelationDesc = NULL;
|
||||
|
||||
if (do_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
|
||||
{
|
||||
char *intoName;
|
||||
Oid namespaceId;
|
||||
Oid tablespaceId;
|
||||
Datum reloptions;
|
||||
AclResult aclresult;
|
||||
Oid intoRelationId;
|
||||
TupleDesc tupdesc;
|
||||
|
||||
/*
|
||||
* Check consistency of arguments
|
||||
*/
|
||||
if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("ON COMMIT can only be used on temporary tables")));
|
||||
|
||||
/*
|
||||
* find namespace to create in, check permissions
|
||||
*/
|
||||
intoName = parseTree->into->relname;
|
||||
namespaceId = RangeVarGetCreationNamespace(parseTree->into);
|
||||
|
||||
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
|
||||
ACL_CREATE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
|
||||
get_namespace_name(namespaceId));
|
||||
|
||||
/*
|
||||
* Select tablespace to use. If not specified, use default_tablespace
|
||||
* (which may in turn default to database's default).
|
||||
*/
|
||||
if (parseTree->intoTableSpaceName)
|
||||
{
|
||||
tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName);
|
||||
if (!OidIsValid(tablespaceId))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||
errmsg("tablespace \"%s\" does not exist",
|
||||
parseTree->intoTableSpaceName)));
|
||||
} else
|
||||
{
|
||||
tablespaceId = GetDefaultTablespace();
|
||||
/* note InvalidOid is OK in this case */
|
||||
}
|
||||
|
||||
/* Parse and validate any reloptions */
|
||||
reloptions = transformRelOptions((Datum) 0,
|
||||
parseTree->intoOptions,
|
||||
true,
|
||||
false);
|
||||
(void) heap_reloptions(RELKIND_RELATION, reloptions, true);
|
||||
|
||||
/* Check permissions except when using the database's default */
|
||||
if (OidIsValid(tablespaceId))
|
||||
{
|
||||
AclResult aclresult;
|
||||
|
||||
aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
|
||||
ACL_CREATE);
|
||||
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
|
||||
get_tablespace_name(tablespaceId));
|
||||
}
|
||||
|
||||
/*
|
||||
* have to copy tupType to get rid of constraints
|
||||
*/
|
||||
tupdesc = CreateTupleDescCopy(tupType);
|
||||
|
||||
intoRelationId = heap_create_with_catalog(intoName,
|
||||
namespaceId,
|
||||
tablespaceId,
|
||||
InvalidOid,
|
||||
GetUserId(),
|
||||
tupdesc,
|
||||
RELKIND_RELATION,
|
||||
false,
|
||||
true,
|
||||
0,
|
||||
parseTree->intoOnCommit,
|
||||
reloptions,
|
||||
allowSystemTableMods);
|
||||
|
||||
FreeTupleDesc(tupdesc);
|
||||
|
||||
/*
|
||||
* Advance command counter so that the newly-created relation's
|
||||
* catalog tuples will be visible to heap_open.
|
||||
*/
|
||||
CommandCounterIncrement();
|
||||
|
||||
/*
|
||||
* If necessary, create a TOAST table for the into relation. Note that
|
||||
* AlterTableCreateToastTable ends with CommandCounterIncrement(), so
|
||||
* that the TOAST table will be visible for insertion.
|
||||
*/
|
||||
AlterTableCreateToastTable(intoRelationId);
|
||||
|
||||
/*
|
||||
* And open the constructed table for writing.
|
||||
*/
|
||||
intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
|
||||
|
||||
/* use_wal off requires rd_targblock be initially invalid */
|
||||
Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber);
|
||||
|
||||
/*
|
||||
* We can skip WAL-logging the insertions, unless PITR is in use.
|
||||
*
|
||||
* Note that for a non-temp INTO table, this is safe only because we
|
||||
* know that the catalog changes above will have been WAL-logged, and
|
||||
* so RecordTransactionCommit will think it needs to WAL-log the
|
||||
* eventual transaction commit. Else the commit might be lost, even
|
||||
* though all the data is safely fsync'd ...
|
||||
*/
|
||||
estate->es_into_relation_use_wal = XLogArchivingActive();
|
||||
}
|
||||
|
||||
estate->es_into_relation_descriptor = intoRelationDesc;
|
||||
|
||||
queryDesc->tupDesc = tupType;
|
||||
queryDesc->planstate = planstate;
|
||||
if (estate->es_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
|
||||
OpenIntoRel(queryDesc);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -914,6 +867,7 @@ initResultRelInfo(ResultRelInfo *resultRelInfo,
|
||||
}
|
||||
resultRelInfo->ri_ConstraintExprs = NULL;
|
||||
resultRelInfo->ri_junkFilter = NULL;
|
||||
resultRelInfo->ri_projectReturning = NULL;
|
||||
|
||||
/*
|
||||
* If there are indices on the result relation, open them and save
|
||||
@ -1031,28 +985,6 @@ ExecEndPlan(PlanState *planstate, EState *estate)
|
||||
resultRelInfo++;
|
||||
}
|
||||
|
||||
/*
|
||||
* close the "into" relation if necessary, again keeping lock
|
||||
*/
|
||||
if (estate->es_into_relation_descriptor != NULL)
|
||||
{
|
||||
/*
|
||||
* If we skipped using WAL, and it's not a temp relation, we must
|
||||
* force the relation down to disk before it's safe to commit the
|
||||
* transaction. This requires forcing out any dirty buffers and then
|
||||
* doing a forced fsync.
|
||||
*/
|
||||
if (!estate->es_into_relation_use_wal &&
|
||||
!estate->es_into_relation_descriptor->rd_istemp)
|
||||
{
|
||||
FlushRelationBuffers(estate->es_into_relation_descriptor);
|
||||
/* FlushRelationBuffers will have opened rd_smgr */
|
||||
smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr);
|
||||
}
|
||||
|
||||
heap_close(estate->es_into_relation_descriptor, NoLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* close any relations selected FOR UPDATE/FOR SHARE, again keeping locks
|
||||
*/
|
||||
@ -1088,6 +1020,7 @@ ExecutePlan(EState *estate,
|
||||
DestReceiver *dest)
|
||||
{
|
||||
JunkFilter *junkfilter;
|
||||
TupleTableSlot *planSlot;
|
||||
TupleTableSlot *slot;
|
||||
ItemPointer tupleid = NULL;
|
||||
ItemPointerData tuple_ctid;
|
||||
@ -1097,7 +1030,6 @@ ExecutePlan(EState *estate,
|
||||
/*
|
||||
* initialize local variables
|
||||
*/
|
||||
slot = NULL;
|
||||
current_tuple_count = 0;
|
||||
result = NULL;
|
||||
|
||||
@ -1140,22 +1072,23 @@ ExecutePlan(EState *estate,
|
||||
lnext: ;
|
||||
if (estate->es_useEvalPlan)
|
||||
{
|
||||
slot = EvalPlanQualNext(estate);
|
||||
if (TupIsNull(slot))
|
||||
slot = ExecProcNode(planstate);
|
||||
planSlot = EvalPlanQualNext(estate);
|
||||
if (TupIsNull(planSlot))
|
||||
planSlot = ExecProcNode(planstate);
|
||||
}
|
||||
else
|
||||
slot = ExecProcNode(planstate);
|
||||
planSlot = ExecProcNode(planstate);
|
||||
|
||||
/*
|
||||
* if the tuple is null, then we assume there is nothing more to
|
||||
* process so we just return null...
|
||||
*/
|
||||
if (TupIsNull(slot))
|
||||
if (TupIsNull(planSlot))
|
||||
{
|
||||
result = NULL;
|
||||
break;
|
||||
}
|
||||
slot = planSlot;
|
||||
|
||||
/*
|
||||
* if we have a junk filter, then project a new tuple with the junk
|
||||
@ -1261,7 +1194,7 @@ lnext: ;
|
||||
estate->es_snapshot->curcid);
|
||||
if (!TupIsNull(newSlot))
|
||||
{
|
||||
slot = newSlot;
|
||||
slot = planSlot = newSlot;
|
||||
estate->es_useEvalPlan = true;
|
||||
goto lmark;
|
||||
}
|
||||
@ -1282,10 +1215,12 @@ lnext: ;
|
||||
}
|
||||
|
||||
/*
|
||||
* Finally create a new "clean" tuple with all junk attributes
|
||||
* removed
|
||||
* Create a new "clean" tuple with all junk attributes removed.
|
||||
* We don't need to do this for DELETE, however (there will
|
||||
* in fact be no non-junk attributes in a DELETE!)
|
||||
*/
|
||||
slot = ExecFilterJunk(junkfilter, slot);
|
||||
if (operation != CMD_DELETE)
|
||||
slot = ExecFilterJunk(junkfilter, slot);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1296,24 +1231,22 @@ lnext: ;
|
||||
switch (operation)
|
||||
{
|
||||
case CMD_SELECT:
|
||||
ExecSelect(slot, /* slot containing tuple */
|
||||
dest, /* destination's tuple-receiver obj */
|
||||
estate);
|
||||
ExecSelect(slot, dest, estate);
|
||||
result = slot;
|
||||
break;
|
||||
|
||||
case CMD_INSERT:
|
||||
ExecInsert(slot, tupleid, estate);
|
||||
ExecInsert(slot, tupleid, planSlot, dest, estate);
|
||||
result = NULL;
|
||||
break;
|
||||
|
||||
case CMD_DELETE:
|
||||
ExecDelete(slot, tupleid, estate);
|
||||
ExecDelete(tupleid, planSlot, dest, estate);
|
||||
result = NULL;
|
||||
break;
|
||||
|
||||
case CMD_UPDATE:
|
||||
ExecUpdate(slot, tupleid, estate);
|
||||
ExecUpdate(slot, tupleid, planSlot, dest, estate);
|
||||
result = NULL;
|
||||
break;
|
||||
|
||||
@ -1364,10 +1297,7 @@ lnext: ;
|
||||
* ExecSelect
|
||||
*
|
||||
* SELECTs are easy.. we just pass the tuple to the appropriate
|
||||
* print function. The only complexity is when we do a
|
||||
* "SELECT INTO", in which case we insert the tuple into
|
||||
* the appropriate relation (note: this is a newly created relation
|
||||
* so we don't need to worry about indices or locks.)
|
||||
* output function.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static void
|
||||
@ -1375,28 +1305,6 @@ ExecSelect(TupleTableSlot *slot,
|
||||
DestReceiver *dest,
|
||||
EState *estate)
|
||||
{
|
||||
/*
|
||||
* insert the tuple into the "into relation"
|
||||
*
|
||||
* XXX this probably ought to be replaced by a separate destination
|
||||
*/
|
||||
if (estate->es_into_relation_descriptor != NULL)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
|
||||
tuple = ExecCopySlotTuple(slot);
|
||||
heap_insert(estate->es_into_relation_descriptor, tuple,
|
||||
estate->es_snapshot->curcid,
|
||||
estate->es_into_relation_use_wal,
|
||||
false); /* never any point in using FSM */
|
||||
/* we know there are no indexes to update */
|
||||
heap_freetuple(tuple);
|
||||
IncrAppended();
|
||||
}
|
||||
|
||||
/*
|
||||
* send the tuple to the destination
|
||||
*/
|
||||
(*dest->receiveSlot) (slot, dest);
|
||||
IncrRetrieved();
|
||||
(estate->es_processed)++;
|
||||
@ -1413,6 +1321,8 @@ ExecSelect(TupleTableSlot *slot,
|
||||
static void
|
||||
ExecInsert(TupleTableSlot *slot,
|
||||
ItemPointer tupleid,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest,
|
||||
EState *estate)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -1490,6 +1400,11 @@ ExecInsert(TupleTableSlot *slot,
|
||||
|
||||
/* AFTER ROW INSERT Triggers */
|
||||
ExecARInsertTriggers(estate, resultRelInfo, tuple);
|
||||
|
||||
/* Process RETURNING if present */
|
||||
if (resultRelInfo->ri_projectReturning)
|
||||
ExecProcessReturning(resultRelInfo->ri_projectReturning,
|
||||
slot, planSlot, dest);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
@ -1500,8 +1415,9 @@ ExecInsert(TupleTableSlot *slot,
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static void
|
||||
ExecDelete(TupleTableSlot *slot,
|
||||
ItemPointer tupleid,
|
||||
ExecDelete(ItemPointer tupleid,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest,
|
||||
EState *estate)
|
||||
{
|
||||
ResultRelInfo *resultRelInfo;
|
||||
@ -1594,6 +1510,33 @@ ldelete:;
|
||||
|
||||
/* AFTER ROW DELETE Triggers */
|
||||
ExecARDeleteTriggers(estate, resultRelInfo, tupleid);
|
||||
|
||||
/* Process RETURNING if present */
|
||||
if (resultRelInfo->ri_projectReturning)
|
||||
{
|
||||
/*
|
||||
* We have to put the target tuple into a slot, which means
|
||||
* first we gotta fetch it. We can use the trigger tuple slot.
|
||||
*/
|
||||
TupleTableSlot *slot = estate->es_trig_tuple_slot;
|
||||
HeapTupleData deltuple;
|
||||
Buffer delbuffer;
|
||||
|
||||
deltuple.t_self = *tupleid;
|
||||
if (!heap_fetch(resultRelationDesc, SnapshotAny,
|
||||
&deltuple, &delbuffer, false, NULL))
|
||||
elog(ERROR, "failed to fetch deleted tuple for DELETE RETURNING");
|
||||
|
||||
if (slot->tts_tupleDescriptor != RelationGetDescr(resultRelationDesc))
|
||||
ExecSetSlotDescriptor(slot, RelationGetDescr(resultRelationDesc));
|
||||
ExecStoreTuple(&deltuple, slot, InvalidBuffer, false);
|
||||
|
||||
ExecProcessReturning(resultRelInfo->ri_projectReturning,
|
||||
slot, planSlot, dest);
|
||||
|
||||
ExecClearTuple(slot);
|
||||
ReleaseBuffer(delbuffer);
|
||||
}
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
@ -1610,6 +1553,8 @@ ldelete:;
|
||||
static void
|
||||
ExecUpdate(TupleTableSlot *slot,
|
||||
ItemPointer tupleid,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest,
|
||||
EState *estate)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -1755,8 +1700,16 @@ lreplace:;
|
||||
|
||||
/* AFTER ROW UPDATE Triggers */
|
||||
ExecARUpdateTriggers(estate, resultRelInfo, tupleid, tuple);
|
||||
|
||||
/* Process RETURNING if present */
|
||||
if (resultRelInfo->ri_projectReturning)
|
||||
ExecProcessReturning(resultRelInfo->ri_projectReturning,
|
||||
slot, planSlot, dest);
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecRelCheck --- check that tuple meets constraints for result relation
|
||||
*/
|
||||
static const char *
|
||||
ExecRelCheck(ResultRelInfo *resultRelInfo,
|
||||
TupleTableSlot *slot, EState *estate)
|
||||
@ -1853,6 +1806,42 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecProcessReturning --- evaluate a RETURNING list and send to dest
|
||||
*
|
||||
* projectReturning: RETURNING projection info for current result rel
|
||||
* tupleSlot: slot holding tuple actually inserted/updated/deleted
|
||||
* planSlot: slot holding tuple returned by top plan node
|
||||
* dest: where to send the output
|
||||
*/
|
||||
static void
|
||||
ExecProcessReturning(ProjectionInfo *projectReturning,
|
||||
TupleTableSlot *tupleSlot,
|
||||
TupleTableSlot *planSlot,
|
||||
DestReceiver *dest)
|
||||
{
|
||||
ExprContext *econtext = projectReturning->pi_exprContext;
|
||||
TupleTableSlot *retSlot;
|
||||
|
||||
/*
|
||||
* Reset per-tuple memory context to free any expression evaluation
|
||||
* storage allocated in the previous cycle.
|
||||
*/
|
||||
ResetExprContext(econtext);
|
||||
|
||||
/* Make tuple and any needed join variables available to ExecProject */
|
||||
econtext->ecxt_scantuple = tupleSlot;
|
||||
econtext->ecxt_outertuple = planSlot;
|
||||
|
||||
/* Compute the RETURNING expressions */
|
||||
retSlot = ExecProject(projectReturning, NULL);
|
||||
|
||||
/* Send to dest */
|
||||
(*dest->receiveSlot) (retSlot, dest);
|
||||
|
||||
ExecClearTuple(retSlot);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check a modified tuple to see if we want to process its updated version
|
||||
* under READ COMMITTED rules.
|
||||
@ -2318,3 +2307,269 @@ EvalPlanQualStop(evalPlanQual *epq)
|
||||
epq->estate = NULL;
|
||||
epq->planstate = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Support for SELECT INTO (a/k/a CREATE TABLE AS)
|
||||
*
|
||||
* We implement SELECT INTO by diverting SELECT's normal output with
|
||||
* a specialized DestReceiver type.
|
||||
*
|
||||
* TODO: remove some of the INTO-specific cruft from EState, and keep
|
||||
* it in the DestReceiver instead.
|
||||
*/
|
||||
|
||||
typedef struct
|
||||
{
|
||||
DestReceiver pub; /* publicly-known function pointers */
|
||||
EState *estate; /* EState we are working with */
|
||||
} DR_intorel;
|
||||
|
||||
/*
|
||||
* OpenIntoRel --- actually create the SELECT INTO target relation
|
||||
*
|
||||
* This also replaces QueryDesc->dest with the special DestReceiver for
|
||||
* SELECT INTO. We assume that the correct result tuple type has already
|
||||
* been placed in queryDesc->tupDesc.
|
||||
*/
|
||||
static void
|
||||
OpenIntoRel(QueryDesc *queryDesc)
|
||||
{
|
||||
Query *parseTree = queryDesc->parsetree;
|
||||
EState *estate = queryDesc->estate;
|
||||
Relation intoRelationDesc;
|
||||
char *intoName;
|
||||
Oid namespaceId;
|
||||
Oid tablespaceId;
|
||||
Datum reloptions;
|
||||
AclResult aclresult;
|
||||
Oid intoRelationId;
|
||||
TupleDesc tupdesc;
|
||||
DR_intorel *myState;
|
||||
|
||||
/*
|
||||
* Check consistency of arguments
|
||||
*/
|
||||
if (parseTree->intoOnCommit != ONCOMMIT_NOOP && !parseTree->into->istemp)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("ON COMMIT can only be used on temporary tables")));
|
||||
|
||||
/*
|
||||
* Find namespace to create in, check its permissions
|
||||
*/
|
||||
intoName = parseTree->into->relname;
|
||||
namespaceId = RangeVarGetCreationNamespace(parseTree->into);
|
||||
|
||||
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
|
||||
ACL_CREATE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
|
||||
get_namespace_name(namespaceId));
|
||||
|
||||
/*
|
||||
* Select tablespace to use. If not specified, use default_tablespace
|
||||
* (which may in turn default to database's default).
|
||||
*/
|
||||
if (parseTree->intoTableSpaceName)
|
||||
{
|
||||
tablespaceId = get_tablespace_oid(parseTree->intoTableSpaceName);
|
||||
if (!OidIsValid(tablespaceId))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||
errmsg("tablespace \"%s\" does not exist",
|
||||
parseTree->intoTableSpaceName)));
|
||||
} else
|
||||
{
|
||||
tablespaceId = GetDefaultTablespace();
|
||||
/* note InvalidOid is OK in this case */
|
||||
}
|
||||
|
||||
/* Check permissions except when using the database's default space */
|
||||
if (OidIsValid(tablespaceId))
|
||||
{
|
||||
AclResult aclresult;
|
||||
|
||||
aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
|
||||
ACL_CREATE);
|
||||
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
|
||||
get_tablespace_name(tablespaceId));
|
||||
}
|
||||
|
||||
/* Parse and validate any reloptions */
|
||||
reloptions = transformRelOptions((Datum) 0,
|
||||
parseTree->intoOptions,
|
||||
true,
|
||||
false);
|
||||
(void) heap_reloptions(RELKIND_RELATION, reloptions, true);
|
||||
|
||||
/* have to copy the actual tupdesc to get rid of any constraints */
|
||||
tupdesc = CreateTupleDescCopy(queryDesc->tupDesc);
|
||||
|
||||
/* Now we can actually create the new relation */
|
||||
intoRelationId = heap_create_with_catalog(intoName,
|
||||
namespaceId,
|
||||
tablespaceId,
|
||||
InvalidOid,
|
||||
GetUserId(),
|
||||
tupdesc,
|
||||
RELKIND_RELATION,
|
||||
false,
|
||||
true,
|
||||
0,
|
||||
parseTree->intoOnCommit,
|
||||
reloptions,
|
||||
allowSystemTableMods);
|
||||
|
||||
FreeTupleDesc(tupdesc);
|
||||
|
||||
/*
|
||||
* Advance command counter so that the newly-created relation's
|
||||
* catalog tuples will be visible to heap_open.
|
||||
*/
|
||||
CommandCounterIncrement();
|
||||
|
||||
/*
|
||||
* If necessary, create a TOAST table for the INTO relation. Note that
|
||||
* AlterTableCreateToastTable ends with CommandCounterIncrement(), so
|
||||
* that the TOAST table will be visible for insertion.
|
||||
*/
|
||||
AlterTableCreateToastTable(intoRelationId);
|
||||
|
||||
/*
|
||||
* And open the constructed table for writing.
|
||||
*/
|
||||
intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
|
||||
|
||||
/* use_wal off requires rd_targblock be initially invalid */
|
||||
Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber);
|
||||
|
||||
/*
|
||||
* We can skip WAL-logging the insertions, unless PITR is in use.
|
||||
*
|
||||
* Note that for a non-temp INTO table, this is safe only because we
|
||||
* know that the catalog changes above will have been WAL-logged, and
|
||||
* so RecordTransactionCommit will think it needs to WAL-log the
|
||||
* eventual transaction commit. Else the commit might be lost, even
|
||||
* though all the data is safely fsync'd ...
|
||||
*/
|
||||
estate->es_into_relation_use_wal = XLogArchivingActive();
|
||||
estate->es_into_relation_descriptor = intoRelationDesc;
|
||||
|
||||
/*
|
||||
* Now replace the query's DestReceiver with one for SELECT INTO
|
||||
*/
|
||||
queryDesc->dest = CreateDestReceiver(DestIntoRel, NULL);
|
||||
myState = (DR_intorel *) queryDesc->dest;
|
||||
Assert(myState->pub.mydest == DestIntoRel);
|
||||
myState->estate = estate;
|
||||
}
|
||||
|
||||
/*
|
||||
* CloseIntoRel --- clean up SELECT INTO at ExecutorEnd time
|
||||
*/
|
||||
static void
|
||||
CloseIntoRel(QueryDesc *queryDesc)
|
||||
{
|
||||
EState *estate = queryDesc->estate;
|
||||
|
||||
/* OpenIntoRel might never have gotten called */
|
||||
if (estate->es_into_relation_descriptor)
|
||||
{
|
||||
/*
|
||||
* If we skipped using WAL, and it's not a temp relation, we must
|
||||
* force the relation down to disk before it's safe to commit the
|
||||
* transaction. This requires forcing out any dirty buffers and then
|
||||
* doing a forced fsync.
|
||||
*/
|
||||
if (!estate->es_into_relation_use_wal &&
|
||||
!estate->es_into_relation_descriptor->rd_istemp)
|
||||
{
|
||||
FlushRelationBuffers(estate->es_into_relation_descriptor);
|
||||
/* FlushRelationBuffers will have opened rd_smgr */
|
||||
smgrimmedsync(estate->es_into_relation_descriptor->rd_smgr);
|
||||
}
|
||||
|
||||
/* close rel, but keep lock until commit */
|
||||
heap_close(estate->es_into_relation_descriptor, NoLock);
|
||||
|
||||
estate->es_into_relation_descriptor = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* CreateIntoRelDestReceiver -- create a suitable DestReceiver object
|
||||
*
|
||||
* Since CreateDestReceiver doesn't accept the parameters we'd need,
|
||||
* we just leave the private fields empty here. OpenIntoRel will
|
||||
* fill them in.
|
||||
*/
|
||||
DestReceiver *
|
||||
CreateIntoRelDestReceiver(void)
|
||||
{
|
||||
DR_intorel *self = (DR_intorel *) palloc(sizeof(DR_intorel));
|
||||
|
||||
self->pub.receiveSlot = intorel_receive;
|
||||
self->pub.rStartup = intorel_startup;
|
||||
self->pub.rShutdown = intorel_shutdown;
|
||||
self->pub.rDestroy = intorel_destroy;
|
||||
self->pub.mydest = DestIntoRel;
|
||||
|
||||
self->estate = NULL;
|
||||
|
||||
return (DestReceiver *) self;
|
||||
}
|
||||
|
||||
/*
|
||||
* intorel_startup --- executor startup
|
||||
*/
|
||||
static void
|
||||
intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
|
||||
{
|
||||
/* no-op */
|
||||
}
|
||||
|
||||
/*
|
||||
* intorel_receive --- receive one tuple
|
||||
*/
|
||||
static void
|
||||
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
|
||||
{
|
||||
DR_intorel *myState = (DR_intorel *) self;
|
||||
EState *estate = myState->estate;
|
||||
HeapTuple tuple;
|
||||
|
||||
tuple = ExecCopySlotTuple(slot);
|
||||
|
||||
heap_insert(estate->es_into_relation_descriptor,
|
||||
tuple,
|
||||
estate->es_snapshot->curcid,
|
||||
estate->es_into_relation_use_wal,
|
||||
false); /* never any point in using FSM */
|
||||
|
||||
/* We know this is a newly created relation, so there are no indexes */
|
||||
|
||||
heap_freetuple(tuple);
|
||||
|
||||
IncrAppended();
|
||||
}
|
||||
|
||||
/*
|
||||
* intorel_shutdown --- executor end
|
||||
*/
|
||||
static void
|
||||
intorel_shutdown(DestReceiver *self)
|
||||
{
|
||||
/* no-op */
|
||||
}
|
||||
|
||||
/*
|
||||
* intorel_destroy --- release DestReceiver object
|
||||
*/
|
||||
static void
|
||||
intorel_destroy(DestReceiver *self)
|
||||
{
|
||||
pfree(self);
|
||||
}
|
||||
|
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/executor/spi.c,v 1.153 2006/08/08 01:23:15 momjian Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/executor/spi.c,v 1.154 2006/08/12 02:52:04 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -955,6 +955,7 @@ SPI_cursor_open(const char *name, void *plan,
|
||||
PortalStart(portal, paramLI, snapshot);
|
||||
|
||||
Assert(portal->strategy == PORTAL_ONE_SELECT ||
|
||||
portal->strategy == PORTAL_ONE_RETURNING ||
|
||||
portal->strategy == PORTAL_UTIL_SELECT);
|
||||
|
||||
/* Return the created portal */
|
||||
@ -1521,17 +1522,15 @@ _SPI_pquery(QueryDesc *queryDesc, long tcount)
|
||||
switch (operation)
|
||||
{
|
||||
case CMD_SELECT:
|
||||
res = SPI_OK_SELECT;
|
||||
if (queryDesc->parsetree->into) /* select into table? */
|
||||
{
|
||||
res = SPI_OK_SELINTO;
|
||||
queryDesc->dest = None_Receiver; /* don't output results */
|
||||
}
|
||||
else if (queryDesc->dest->mydest != DestSPI)
|
||||
{
|
||||
/* Don't return SPI_OK_SELECT if we're discarding result */
|
||||
res = SPI_OK_UTILITY;
|
||||
}
|
||||
else
|
||||
res = SPI_OK_SELECT;
|
||||
break;
|
||||
case CMD_INSERT:
|
||||
res = SPI_OK_INSERT;
|
||||
|
Reference in New Issue
Block a user