mirror of
https://github.com/postgres/postgres.git
synced 2025-06-27 23:21:58 +03:00
Modified files for MERGE
This commit is contained in:
@ -42,6 +42,7 @@
|
||||
#include "commands/trigger.h"
|
||||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeMerge.h"
|
||||
#include "executor/nodeModifyTable.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "miscadmin.h"
|
||||
@ -62,17 +63,17 @@ static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
bool canSetTag,
|
||||
TupleTableSlot **returning);
|
||||
static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
PartitionTupleRouting *proute,
|
||||
ResultRelInfo *targetRelInfo,
|
||||
TupleTableSlot *slot);
|
||||
static ResultRelInfo *getTargetResultRelInfo(ModifyTableState *node);
|
||||
static void ExecSetupChildParentMapForTcs(ModifyTableState *mtstate);
|
||||
static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
|
||||
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
|
||||
int whichplan);
|
||||
|
||||
/* flags for mt_merge_subcommands */
|
||||
#define MERGE_INSERT 0x01
|
||||
#define MERGE_UPDATE 0x02
|
||||
#define MERGE_DELETE 0x04
|
||||
|
||||
/*
|
||||
* Verify that the tuples to be produced by INSERT or UPDATE match the
|
||||
* target relation's rowtype
|
||||
@ -259,11 +260,12 @@ ExecCheckTIDVisible(EState *estate,
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
extern TupleTableSlot *
|
||||
ExecInsert(ModifyTableState *mtstate,
|
||||
TupleTableSlot *slot,
|
||||
TupleTableSlot *planSlot,
|
||||
EState *estate,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -390,9 +392,17 @@ ExecInsert(ModifyTableState *mtstate,
|
||||
* partition, we should instead check UPDATE policies, because we are
|
||||
* executing policies defined on the target table, and not those
|
||||
* defined on the child partitions.
|
||||
*
|
||||
* If we're running MERGE, we refer to the action that we're executing
|
||||
* to know if we're doing an INSERT or UPDATE to a partition table.
|
||||
*/
|
||||
wco_kind = (mtstate->operation == CMD_UPDATE) ?
|
||||
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
|
||||
if (mtstate->operation == CMD_UPDATE)
|
||||
wco_kind = WCO_RLS_UPDATE_CHECK;
|
||||
else if (mtstate->operation == CMD_MERGE)
|
||||
wco_kind = (actionState->commandType == CMD_UPDATE) ?
|
||||
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
|
||||
else
|
||||
wco_kind = WCO_RLS_INSERT_CHECK;
|
||||
|
||||
/*
|
||||
* ExecWithCheckOptions() will skip any WCOs which are not of the kind
|
||||
@ -617,10 +627,19 @@ ExecInsert(ModifyTableState *mtstate,
|
||||
* passed to foreign table triggers; it is NULL when the foreign
|
||||
* table has no relevant triggers.
|
||||
*
|
||||
* MERGE passes actionState of the action it's currently executing;
|
||||
* regular DELETE passes NULL. This is used by ExecDelete to know if it's
|
||||
* being called from MERGE or regular DELETE operation.
|
||||
*
|
||||
* If the DELETE fails because the tuple is concurrently updated/deleted
|
||||
* by this or some other transaction, hufdp is filled with the reason as
|
||||
* well as other important information. Currently only MERGE needs this
|
||||
* information.
|
||||
*
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
TupleTableSlot *
|
||||
ExecDelete(ModifyTableState *mtstate,
|
||||
ItemPointer tupleid,
|
||||
HeapTuple oldtuple,
|
||||
@ -629,6 +648,8 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
bool *tupleDeleted,
|
||||
bool processReturning,
|
||||
HeapUpdateFailureData *hufdp,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
ResultRelInfo *resultRelInfo;
|
||||
@ -641,6 +662,14 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
if (tupleDeleted)
|
||||
*tupleDeleted = false;
|
||||
|
||||
/*
|
||||
* Initialize hufdp. Since the caller is only interested in the failure
|
||||
* status, initialize with the state that is used to indicate successful
|
||||
* operation.
|
||||
*/
|
||||
if (hufdp)
|
||||
hufdp->result = HeapTupleMayBeUpdated;
|
||||
|
||||
/*
|
||||
* get information on the (current) result relation
|
||||
*/
|
||||
@ -654,7 +683,7 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
bool dodelete;
|
||||
|
||||
dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
|
||||
tupleid, oldtuple);
|
||||
tupleid, oldtuple, hufdp);
|
||||
|
||||
if (!dodelete) /* "do nothing" */
|
||||
return NULL;
|
||||
@ -721,6 +750,15 @@ ldelete:;
|
||||
estate->es_crosscheck_snapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd);
|
||||
|
||||
/*
|
||||
* Copy the necessary information, if the caller has asked for it. We
|
||||
* must do this irrespective of whether the tuple was updated or
|
||||
* deleted.
|
||||
*/
|
||||
if (hufdp)
|
||||
*hufdp = hufd;
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case HeapTupleSelfUpdated:
|
||||
@ -755,7 +793,11 @@ ldelete:;
|
||||
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
|
||||
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
|
||||
|
||||
/* Else, already deleted by self; nothing to do */
|
||||
/*
|
||||
* Else, already deleted by self; nothing to do but inform
|
||||
* MERGE about it anyways so that it can take necessary
|
||||
* action.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
case HeapTupleMayBeUpdated:
|
||||
@ -766,14 +808,24 @@ ldelete:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
TupleTableSlot *epqslot;
|
||||
|
||||
/*
|
||||
* If we're executing MERGE, then the onus of running
|
||||
* EvalPlanQual() and handling its outcome lies with the
|
||||
* caller.
|
||||
*/
|
||||
if (actionState != NULL)
|
||||
return NULL;
|
||||
|
||||
/* Normal DELETE path. */
|
||||
epqslot = EvalPlanQual(estate,
|
||||
epqstate,
|
||||
resultRelationDesc,
|
||||
resultRelInfo->ri_RangeTableIndex,
|
||||
GetEPQRangeTableIndex(resultRelInfo),
|
||||
LockTupleExclusive,
|
||||
&hufd.ctid,
|
||||
hufd.xmax);
|
||||
@ -783,7 +835,12 @@ ldelete:;
|
||||
goto ldelete;
|
||||
}
|
||||
}
|
||||
/* tuple already deleted; nothing to do */
|
||||
|
||||
/*
|
||||
* tuple already deleted; nothing to do. But MERGE might want
|
||||
* to handle it differently. We've already filled-in hufdp
|
||||
* with sufficient information for MERGE to look at.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
@ -911,10 +968,21 @@ ldelete:;
|
||||
* foreign table triggers; it is NULL when the foreign table has
|
||||
* no relevant triggers.
|
||||
*
|
||||
* MERGE passes actionState of the action it's currently executing;
|
||||
* regular UPDATE passes NULL. This is used by ExecUpdate to know if it's
|
||||
* being called from MERGE or regular UPDATE operation. ExecUpdate may
|
||||
* pass this information to ExecInsert if it ends up running DELETE+INSERT
|
||||
* for partition key updates.
|
||||
*
|
||||
* If the UPDATE fails because the tuple is concurrently updated/deleted
|
||||
* by this or some other transaction, hufdp is filled with the reason as
|
||||
* well as other important information. Currently only MERGE needs this
|
||||
* information.
|
||||
*
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
extern TupleTableSlot *
|
||||
ExecUpdate(ModifyTableState *mtstate,
|
||||
ItemPointer tupleid,
|
||||
HeapTuple oldtuple,
|
||||
@ -922,6 +990,9 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
TupleTableSlot *planSlot,
|
||||
EPQState *epqstate,
|
||||
EState *estate,
|
||||
bool *tuple_updated,
|
||||
HeapUpdateFailureData *hufdp,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -938,6 +1009,17 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
if (IsBootstrapProcessingMode())
|
||||
elog(ERROR, "cannot UPDATE during bootstrap");
|
||||
|
||||
if (tuple_updated)
|
||||
*tuple_updated = false;
|
||||
|
||||
/*
|
||||
* Initialize hufdp. Since the caller is only interested in the failure
|
||||
* status, initialize with the state that is used to indicate successful
|
||||
* operation.
|
||||
*/
|
||||
if (hufdp)
|
||||
hufdp->result = HeapTupleMayBeUpdated;
|
||||
|
||||
/*
|
||||
* get the heap tuple out of the tuple table slot, making sure we have a
|
||||
* writable copy
|
||||
@ -955,7 +1037,7 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
resultRelInfo->ri_TrigDesc->trig_update_before_row)
|
||||
{
|
||||
slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
|
||||
tupleid, oldtuple, slot);
|
||||
tupleid, oldtuple, slot, hufdp);
|
||||
|
||||
if (slot == NULL) /* "do nothing" */
|
||||
return NULL;
|
||||
@ -1001,7 +1083,6 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
}
|
||||
else
|
||||
{
|
||||
LockTupleMode lockmode;
|
||||
bool partition_constraint_failed;
|
||||
|
||||
/*
|
||||
@ -1079,8 +1160,9 @@ lreplace:;
|
||||
* Row movement, part 1. Delete the tuple, but skip RETURNING
|
||||
* processing. We want to return rows from INSERT.
|
||||
*/
|
||||
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
|
||||
&tuple_deleted, false, false);
|
||||
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
|
||||
estate, &tuple_deleted, false, hufdp, NULL,
|
||||
false);
|
||||
|
||||
/*
|
||||
* For some reason if DELETE didn't happen (e.g. trigger prevented
|
||||
@ -1116,16 +1198,36 @@ lreplace:;
|
||||
saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
|
||||
|
||||
/*
|
||||
* resultRelInfo is one of the per-subplan resultRelInfos. So we
|
||||
* should convert the tuple into root's tuple descriptor, since
|
||||
* ExecInsert() starts the search from root. The tuple conversion
|
||||
* map list is in the order of mtstate->resultRelInfo[], so to
|
||||
* retrieve the one for this resultRel, we need to know the
|
||||
* position of the resultRel in mtstate->resultRelInfo[].
|
||||
* We should convert the tuple into root's tuple descriptor, since
|
||||
* ExecInsert() starts the search from root. To do that, we need to
|
||||
* retrieve the tuple conversion map for this resultRelInfo.
|
||||
*
|
||||
* If we're running MERGE then resultRelInfo is per-partition
|
||||
* resultRelInfo as initialized in ExecInitPartitionInfo(). Note
|
||||
* that we don't expand inheritance for the resultRelation in case
|
||||
* of MERGE and hence there is just one subplan. Whereas for
|
||||
* regular UPDATE, resultRelInfo is one of the per-subplan
|
||||
* resultRelInfos. In either case the position of this partition in
|
||||
* tracked in ri_PartitionLeafIndex;
|
||||
*
|
||||
* Retrieve the map either by looking at the resultRelInfo's
|
||||
* position in mtstate->resultRelInfo[] (for UPDATE) or by simply
|
||||
* using the ri_PartitionLeafIndex value (for MERGE).
|
||||
*/
|
||||
map_index = resultRelInfo - mtstate->resultRelInfo;
|
||||
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
|
||||
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
|
||||
if (mtstate->operation == CMD_MERGE)
|
||||
{
|
||||
map_index = resultRelInfo->ri_PartitionLeafIndex;
|
||||
Assert(mtstate->rootResultRelInfo == NULL);
|
||||
tupconv_map = TupConvMapForLeaf(proute,
|
||||
mtstate->resultRelInfo,
|
||||
map_index);
|
||||
}
|
||||
else
|
||||
{
|
||||
map_index = resultRelInfo - mtstate->resultRelInfo;
|
||||
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
|
||||
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
|
||||
}
|
||||
tuple = ConvertPartitionTupleSlot(tupconv_map,
|
||||
tuple,
|
||||
proute->root_tuple_slot,
|
||||
@ -1135,12 +1237,16 @@ lreplace:;
|
||||
* Prepare for tuple routing, making it look like we're inserting
|
||||
* into the root.
|
||||
*/
|
||||
Assert(mtstate->rootResultRelInfo != NULL);
|
||||
slot = ExecPrepareTupleRouting(mtstate, estate, proute,
|
||||
mtstate->rootResultRelInfo, slot);
|
||||
getTargetResultRelInfo(mtstate),
|
||||
slot);
|
||||
|
||||
ret_slot = ExecInsert(mtstate, slot, planSlot,
|
||||
estate, canSetTag);
|
||||
estate, actionState, canSetTag);
|
||||
|
||||
/* Update is successful. */
|
||||
if (tuple_updated)
|
||||
*tuple_updated = true;
|
||||
|
||||
/* Revert ExecPrepareTupleRouting's node change. */
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
@ -1178,7 +1284,16 @@ lreplace:;
|
||||
estate->es_output_cid,
|
||||
estate->es_crosscheck_snapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd, &lockmode);
|
||||
&hufd);
|
||||
|
||||
/*
|
||||
* Copy the necessary information, if the caller has asked for it. We
|
||||
* must do this irrespective of whether the tuple was updated or
|
||||
* deleted.
|
||||
*/
|
||||
if (hufdp)
|
||||
*hufdp = hufd;
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case HeapTupleSelfUpdated:
|
||||
@ -1223,26 +1338,42 @@ lreplace:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
TupleTableSlot *epqslot;
|
||||
|
||||
/*
|
||||
* If we're executing MERGE, then the onus of running
|
||||
* EvalPlanQual() and handling its outcome lies with the
|
||||
* caller.
|
||||
*/
|
||||
if (actionState != NULL)
|
||||
return NULL;
|
||||
|
||||
/* Regular UPDATE path. */
|
||||
epqslot = EvalPlanQual(estate,
|
||||
epqstate,
|
||||
resultRelationDesc,
|
||||
resultRelInfo->ri_RangeTableIndex,
|
||||
lockmode,
|
||||
GetEPQRangeTableIndex(resultRelInfo),
|
||||
hufd.lockmode,
|
||||
&hufd.ctid,
|
||||
hufd.xmax);
|
||||
if (!TupIsNull(epqslot))
|
||||
{
|
||||
*tupleid = hufd.ctid;
|
||||
/* Normal UPDATE path */
|
||||
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
|
||||
tuple = ExecMaterializeSlot(slot);
|
||||
goto lreplace;
|
||||
}
|
||||
}
|
||||
/* tuple already deleted; nothing to do */
|
||||
|
||||
/*
|
||||
* tuple already deleted; nothing to do. But MERGE might want
|
||||
* to handle it differently. We've already filled-in hufdp
|
||||
* with sufficient information for MERGE to look at.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
@ -1271,6 +1402,9 @@ lreplace:;
|
||||
estate, false, NULL, NIL);
|
||||
}
|
||||
|
||||
if (tuple_updated)
|
||||
*tuple_updated = true;
|
||||
|
||||
if (canSetTag)
|
||||
(estate->es_processed)++;
|
||||
|
||||
@ -1365,9 +1499,9 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
* there's no historical behavior to break.
|
||||
*
|
||||
* It is the user's responsibility to prevent this situation from
|
||||
* occurring. These problems are why SQL-2003 similarly specifies
|
||||
* that for SQL MERGE, an exception must be raised in the event of
|
||||
* an attempt to update the same row twice.
|
||||
* occurring. These problems are why SQL Standard similarly
|
||||
* specifies that for SQL MERGE, an exception must be raised in
|
||||
* the event of an attempt to update the same row twice.
|
||||
*/
|
||||
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple.t_data)))
|
||||
ereport(ERROR,
|
||||
@ -1489,7 +1623,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
*returning = ExecUpdate(mtstate, &tuple.t_self, NULL,
|
||||
mtstate->mt_conflproj, planSlot,
|
||||
&mtstate->mt_epqstate, mtstate->ps.state,
|
||||
canSetTag);
|
||||
NULL, NULL, NULL, canSetTag);
|
||||
|
||||
ReleaseBuffer(buffer);
|
||||
return true;
|
||||
@ -1527,6 +1661,14 @@ fireBSTriggers(ModifyTableState *node)
|
||||
case CMD_DELETE:
|
||||
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
|
||||
break;
|
||||
case CMD_MERGE:
|
||||
if (node->mt_merge_subcommands & MERGE_INSERT)
|
||||
ExecBSInsertTriggers(node->ps.state, resultRelInfo);
|
||||
if (node->mt_merge_subcommands & MERGE_UPDATE)
|
||||
ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
|
||||
if (node->mt_merge_subcommands & MERGE_DELETE)
|
||||
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
@ -1582,6 +1724,17 @@ fireASTriggers(ModifyTableState *node)
|
||||
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
break;
|
||||
case CMD_MERGE:
|
||||
if (node->mt_merge_subcommands & MERGE_DELETE)
|
||||
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
if (node->mt_merge_subcommands & MERGE_UPDATE)
|
||||
ExecASUpdateTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
if (node->mt_merge_subcommands & MERGE_INSERT)
|
||||
ExecASInsertTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
@ -1644,7 +1797,7 @@ ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
|
||||
*
|
||||
* Returns a slot holding the tuple of the partition rowtype.
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
TupleTableSlot *
|
||||
ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
PartitionTupleRouting *proute,
|
||||
@ -1967,6 +2120,7 @@ ExecModifyTable(PlanState *pstate)
|
||||
{
|
||||
/* advance to next subplan if any */
|
||||
node->mt_whichplan++;
|
||||
|
||||
if (node->mt_whichplan < node->mt_nplans)
|
||||
{
|
||||
resultRelInfo++;
|
||||
@ -2015,6 +2169,12 @@ ExecModifyTable(PlanState *pstate)
|
||||
EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
|
||||
slot = planSlot;
|
||||
|
||||
if (operation == CMD_MERGE)
|
||||
{
|
||||
ExecMerge(node, estate, slot, junkfilter, resultRelInfo);
|
||||
continue;
|
||||
}
|
||||
|
||||
tupleid = NULL;
|
||||
oldtuple = NULL;
|
||||
if (junkfilter != NULL)
|
||||
@ -2096,19 +2256,20 @@ ExecModifyTable(PlanState *pstate)
|
||||
slot = ExecPrepareTupleRouting(node, estate, proute,
|
||||
resultRelInfo, slot);
|
||||
slot = ExecInsert(node, slot, planSlot,
|
||||
estate, node->canSetTag);
|
||||
estate, NULL, node->canSetTag);
|
||||
/* Revert ExecPrepareTupleRouting's state change. */
|
||||
if (proute)
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot,
|
||||
&node->mt_epqstate, estate, node->canSetTag);
|
||||
&node->mt_epqstate, estate,
|
||||
NULL, NULL, NULL, node->canSetTag);
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
slot = ExecDelete(node, tupleid, oldtuple, planSlot,
|
||||
&node->mt_epqstate, estate,
|
||||
NULL, true, node->canSetTag);
|
||||
NULL, true, NULL, NULL, node->canSetTag);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
@ -2198,6 +2359,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
saved_resultRelInfo = estate->es_result_relation_info;
|
||||
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
/*
|
||||
* mergeTargetRelation must be set if we're running MERGE and mustn't be
|
||||
* set if we're not.
|
||||
*/
|
||||
Assert(operation != CMD_MERGE || node->mergeTargetRelation > 0);
|
||||
Assert(operation == CMD_MERGE || node->mergeTargetRelation == 0);
|
||||
|
||||
resultRelInfo->ri_mergeTargetRTI = node->mergeTargetRelation;
|
||||
|
||||
i = 0;
|
||||
foreach(l, node->plans)
|
||||
{
|
||||
@ -2276,7 +2447,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
* partition key.
|
||||
*/
|
||||
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
|
||||
(operation == CMD_INSERT || update_tuple_routing_needed))
|
||||
(operation == CMD_INSERT || operation == CMD_MERGE ||
|
||||
update_tuple_routing_needed))
|
||||
mtstate->mt_partition_tuple_routing =
|
||||
ExecSetupPartitionTupleRouting(mtstate, rel);
|
||||
|
||||
@ -2287,6 +2459,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
|
||||
ExecSetupTransitionCaptureState(mtstate, estate);
|
||||
|
||||
/*
|
||||
* If we are doing MERGE then setup child-parent mapping. This will be
|
||||
* required in case we end up doing a partition-key update, triggering a
|
||||
* tuple routing.
|
||||
*/
|
||||
if (mtstate->operation == CMD_MERGE &&
|
||||
mtstate->mt_partition_tuple_routing != NULL)
|
||||
ExecSetupChildParentMapForLeaf(mtstate->mt_partition_tuple_routing);
|
||||
|
||||
/*
|
||||
* Construct mapping from each of the per-subplan partition attnos to the
|
||||
* root attno. This is required when during update row movement the tuple
|
||||
@ -2478,6 +2659,106 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
}
|
||||
}
|
||||
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
if (node->mergeActionList)
|
||||
{
|
||||
ListCell *l;
|
||||
ExprContext *econtext;
|
||||
List *mergeMatchedActionStates = NIL;
|
||||
List *mergeNotMatchedActionStates = NIL;
|
||||
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
|
||||
|
||||
mtstate->mt_merge_subcommands = 0;
|
||||
|
||||
if (mtstate->ps.ps_ExprContext == NULL)
|
||||
ExecAssignExprContext(estate, &mtstate->ps);
|
||||
|
||||
econtext = mtstate->ps.ps_ExprContext;
|
||||
|
||||
/* initialize slot for the existing tuple */
|
||||
Assert(mtstate->mt_existing == NULL);
|
||||
mtstate->mt_existing =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/* initialize slot for merge actions */
|
||||
Assert(mtstate->mt_mergeproj == NULL);
|
||||
mtstate->mt_mergeproj =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/*
|
||||
* Create a MergeActionState for each action on the mergeActionList
|
||||
* and add it to either a list of matched actions or not-matched
|
||||
* actions.
|
||||
*/
|
||||
foreach(l, node->mergeActionList)
|
||||
{
|
||||
MergeAction *action = (MergeAction *) lfirst(l);
|
||||
MergeActionState *action_state = makeNode(MergeActionState);
|
||||
TupleDesc tupDesc;
|
||||
|
||||
action_state->matched = action->matched;
|
||||
action_state->commandType = action->commandType;
|
||||
action_state->whenqual = ExecInitQual((List *) action->qual,
|
||||
&mtstate->ps);
|
||||
|
||||
/* create target slot for this action's projection */
|
||||
tupDesc = ExecTypeFromTL((List *) action->targetList,
|
||||
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
|
||||
action_state->tupDesc = tupDesc;
|
||||
|
||||
/* build action projection state */
|
||||
action_state->proj =
|
||||
ExecBuildProjectionInfo(action->targetList, econtext,
|
||||
mtstate->mt_mergeproj, &mtstate->ps,
|
||||
resultRelInfo->ri_RelationDesc->rd_att);
|
||||
|
||||
/*
|
||||
* We create two lists - one for WHEN MATCHED actions and one
|
||||
* for WHEN NOT MATCHED actions - and stick the
|
||||
* MergeActionState into the appropriate list.
|
||||
*/
|
||||
if (action_state->matched)
|
||||
mergeMatchedActionStates =
|
||||
lappend(mergeMatchedActionStates, action_state);
|
||||
else
|
||||
mergeNotMatchedActionStates =
|
||||
lappend(mergeNotMatchedActionStates, action_state);
|
||||
|
||||
switch (action->commandType)
|
||||
{
|
||||
case CMD_INSERT:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_INSERT;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
mtstate->mt_merge_subcommands |= MERGE_DELETE;
|
||||
break;
|
||||
case CMD_NOTHING:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
}
|
||||
|
||||
resultRelInfo->ri_mergeState->matchedActionStates =
|
||||
mergeMatchedActionStates;
|
||||
resultRelInfo->ri_mergeState->notMatchedActionStates =
|
||||
mergeNotMatchedActionStates;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* select first subplan */
|
||||
mtstate->mt_whichplan = 0;
|
||||
subplan = (Plan *) linitial(node->plans);
|
||||
@ -2491,7 +2772,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
* --- no need to look first. Typically, this will be a 'ctid' or
|
||||
* 'wholerow' attribute, but in the case of a foreign data wrapper it
|
||||
* might be a set of junk attributes sufficient to identify the remote
|
||||
* row.
|
||||
* row. We follow this logic for MERGE, so it always has a junk attributes.
|
||||
*
|
||||
* If there are multiple result relations, each one needs its own junk
|
||||
* filter. Note multiple rels are only possible for UPDATE/DELETE, so we
|
||||
@ -2519,6 +2800,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
case CMD_DELETE:
|
||||
case CMD_MERGE:
|
||||
junk_filter_needed = true;
|
||||
break;
|
||||
default:
|
||||
@ -2534,6 +2816,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
JunkFilter *j;
|
||||
|
||||
subplan = mtstate->mt_plans[i]->plan;
|
||||
|
||||
if (operation == CMD_INSERT || operation == CMD_UPDATE)
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
subplan->targetlist);
|
||||
@ -2542,7 +2825,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
resultRelInfo->ri_RelationDesc->rd_att->tdhasoid,
|
||||
ExecInitExtraTupleSlot(estate, NULL));
|
||||
|
||||
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||
if (operation == CMD_UPDATE ||
|
||||
operation == CMD_DELETE ||
|
||||
operation == CMD_MERGE)
|
||||
{
|
||||
/* For UPDATE/DELETE, find the appropriate junk attr now */
|
||||
char relkind;
|
||||
@ -2555,6 +2840,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid");
|
||||
if (!AttributeNumberIsValid(j->jf_junkAttNo))
|
||||
elog(ERROR, "could not find junk ctid column");
|
||||
|
||||
if (operation == CMD_MERGE &&
|
||||
relkind == RELKIND_PARTITIONED_TABLE)
|
||||
{
|
||||
j->jf_otherJunkAttNo = ExecFindJunkAttribute(j, "tableoid");
|
||||
if (!AttributeNumberIsValid(j->jf_otherJunkAttNo))
|
||||
elog(ERROR, "could not find junk tableoid column");
|
||||
|
||||
}
|
||||
}
|
||||
else if (relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
|
Reference in New Issue
Block a user