mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
MERGE SQL Command following SQL:2016
MERGE performs actions that modify rows in the target table using a source table or query. MERGE provides a single SQL statement that can conditionally INSERT/UPDATE/DELETE rows a task that would other require multiple PL statements. e.g. MERGE INTO target AS t USING source AS s ON t.tid = s.sid WHEN MATCHED AND t.balance > s.delta THEN UPDATE SET balance = t.balance - s.delta WHEN MATCHED THEN DELETE WHEN NOT MATCHED AND s.delta > 0 THEN INSERT VALUES (s.sid, s.delta) WHEN NOT MATCHED THEN DO NOTHING; MERGE works with regular and partitioned tables, including column and row security enforcement, as well as support for row, statement and transition triggers. MERGE is optimized for OLTP and is parameterizable, though also useful for large scale ETL/ELT. MERGE is not intended to be used in preference to existing single SQL commands for INSERT, UPDATE or DELETE since there is some overhead. MERGE can be used statically from PL/pgSQL. MERGE does not yet support inheritance, write rules, RETURNING clauses, updatable views or foreign tables. MERGE follows SQL Standard per the most recent SQL:2016. Includes full tests and documentation, including full isolation tests to demonstrate the concurrent behavior. This version written from scratch in 2017 by Simon Riggs, using docs and tests originally written in 2009. Later work from Pavan Deolasee has been both complex and deep, leaving the lead author credit now in his hands. Extensive discussion of concurrency from Peter Geoghegan, with thanks for the time and effort contributed. Various issues reported via sqlsmith by Andreas Seltenreich Authors: Pavan Deolasee, Simon Riggs Reviewer: Peter Geoghegan, Amit Langote, Tomas Vondra, Simon Riggs Discussion: https://postgr.es/m/CANP8+jKitBSrB7oTgT9CY2i1ObfOt36z0XMraQc+Xrz8QB0nXA@mail.gmail.com https://postgr.es/m/CAH2-WzkJdBuxj9PO=2QaO9-3h3xGbQPZ34kJH=HukRekwM-GZg@mail.gmail.com
This commit is contained in:
@ -22,7 +22,7 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
|
||||
nodeCustom.o nodeFunctionscan.o nodeGather.o \
|
||||
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
|
||||
nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
|
||||
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
|
||||
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeMerge.o nodeModifyTable.o \
|
||||
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
|
||||
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
|
||||
nodeValuesscan.o \
|
||||
|
@ -37,6 +37,16 @@ the plan tree returns the computed tuples to be updated, plus a "junk"
|
||||
one. For DELETE, the plan tree need only deliver a CTID column, and the
|
||||
ModifyTable node visits each of those rows and marks the row deleted.
|
||||
|
||||
MERGE runs one generic plan that returns candidate target rows. Each row
|
||||
consists of a super-row that contains all the columns needed by any of the
|
||||
individual actions, plus a CTID and a TABLEOID junk columns. The CTID column is
|
||||
required to know if a matching target row was found or not and the TABLEOID
|
||||
column is needed to find the underlying target partition, in case when the
|
||||
target table is a partition table. If the CTID column is set we attempt to
|
||||
activate WHEN MATCHED actions, or if it is NULL then we will attempt to
|
||||
activate WHEN NOT MATCHED actions. Once we know which action is activated we
|
||||
form the final result row and apply only those changes.
|
||||
|
||||
XXX a great deal more documentation needs to be written here...
|
||||
|
||||
|
||||
|
@ -233,6 +233,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||
case CMD_INSERT:
|
||||
case CMD_DELETE:
|
||||
case CMD_UPDATE:
|
||||
case CMD_MERGE:
|
||||
estate->es_output_cid = GetCurrentCommandId(true);
|
||||
break;
|
||||
|
||||
@ -1357,6 +1358,9 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
|
||||
resultRelInfo->ri_onConflictArbiterIndexes = NIL;
|
||||
resultRelInfo->ri_onConflict = NULL;
|
||||
|
||||
resultRelInfo->ri_mergeTargetRTI = 0;
|
||||
resultRelInfo->ri_mergeState = (MergeState *) palloc0(sizeof (MergeState));
|
||||
|
||||
/*
|
||||
* Partition constraint, which also includes the partition constraint of
|
||||
* all the ancestors that are partitions. Note that it will be checked
|
||||
@ -2205,6 +2209,19 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
|
||||
errmsg("new row violates row-level security policy for table \"%s\"",
|
||||
wco->relname)));
|
||||
break;
|
||||
case WCO_RLS_MERGE_UPDATE_CHECK:
|
||||
case WCO_RLS_MERGE_DELETE_CHECK:
|
||||
if (wco->polname != NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("target row violates row-level security policy \"%s\" (USING expression) for table \"%s\"",
|
||||
wco->polname, wco->relname)));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("target row violates row-level security policy (USING expression) for table \"%s\"",
|
||||
wco->relname)));
|
||||
break;
|
||||
case WCO_RLS_CONFLICT_CHECK:
|
||||
if (wco->polname != NULL)
|
||||
ereport(ERROR,
|
||||
|
@ -67,6 +67,8 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||
ResultRelInfo *update_rri = NULL;
|
||||
int num_update_rri = 0,
|
||||
update_rri_index = 0;
|
||||
bool is_update = false;
|
||||
bool is_merge = false;
|
||||
PartitionTupleRouting *proute;
|
||||
int nparts;
|
||||
ModifyTable *node = mtstate ? (ModifyTable *) mtstate->ps.plan : NULL;
|
||||
@ -89,13 +91,22 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||
|
||||
/* Set up details specific to the type of tuple routing we are doing. */
|
||||
if (node && node->operation == CMD_UPDATE)
|
||||
is_update = true;
|
||||
else if (node && node->operation == CMD_MERGE)
|
||||
is_merge = true;
|
||||
|
||||
if (is_update)
|
||||
{
|
||||
update_rri = mtstate->resultRelInfo;
|
||||
num_update_rri = list_length(node->plans);
|
||||
proute->subplan_partition_offsets =
|
||||
palloc(num_update_rri * sizeof(int));
|
||||
proute->num_subplan_partition_offsets = num_update_rri;
|
||||
}
|
||||
|
||||
|
||||
if (is_update || is_merge)
|
||||
{
|
||||
/*
|
||||
* We need an additional tuple slot for storing transient tuples that
|
||||
* are converted to the root table descriptor.
|
||||
@ -299,6 +310,25 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Given OID of the partition leaf, return the index of the leaf in the
|
||||
* partition hierarchy.
|
||||
*/
|
||||
int
|
||||
ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < proute->num_partitions; i++)
|
||||
{
|
||||
if (proute->partition_oids[i] == partoid)
|
||||
break;
|
||||
}
|
||||
|
||||
Assert(i < proute->num_partitions);
|
||||
return i;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecInitPartitionInfo
|
||||
* Initialize ResultRelInfo and other information for a partition if not
|
||||
@ -337,6 +367,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
rootrel,
|
||||
estate->es_instrument);
|
||||
|
||||
leaf_part_rri->ri_PartitionLeafIndex = partidx;
|
||||
|
||||
/*
|
||||
* Verify result relation is a valid target for an INSERT. An UPDATE of a
|
||||
* partition-key becomes a DELETE+INSERT operation, so this check is still
|
||||
@ -625,6 +657,90 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
Assert(proute->partitions[partidx] == NULL);
|
||||
proute->partitions[partidx] = leaf_part_rri;
|
||||
|
||||
/*
|
||||
* Initialize information about this partition that's needed to handle
|
||||
* MERGE.
|
||||
*/
|
||||
if (node && node->operation == CMD_MERGE)
|
||||
{
|
||||
TupleDesc partrelDesc = RelationGetDescr(partrel);
|
||||
TupleConversionMap *map = proute->parent_child_tupconv_maps[partidx];
|
||||
int firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
|
||||
Relation firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
|
||||
|
||||
/*
|
||||
* If the root parent and partition have the same tuple
|
||||
* descriptor, just reuse the original MERGE state for partition.
|
||||
*/
|
||||
if (map == NULL)
|
||||
{
|
||||
leaf_part_rri->ri_mergeState = resultRelInfo->ri_mergeState;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Convert expressions contain partition's attnos. */
|
||||
List *conv_tl, *conv_qual;
|
||||
ListCell *l;
|
||||
List *matchedActionStates = NIL;
|
||||
List *notMatchedActionStates = NIL;
|
||||
|
||||
foreach (l, node->mergeActionList)
|
||||
{
|
||||
MergeAction *action = lfirst_node(MergeAction, l);
|
||||
MergeActionState *action_state = makeNode(MergeActionState);
|
||||
TupleDesc tupDesc;
|
||||
ExprContext *econtext;
|
||||
|
||||
action_state->matched = action->matched;
|
||||
action_state->commandType = action->commandType;
|
||||
|
||||
conv_qual = (List *) action->qual;
|
||||
conv_qual = map_partition_varattnos(conv_qual,
|
||||
firstVarno, partrel,
|
||||
firstResultRel, NULL);
|
||||
|
||||
action_state->whenqual = ExecInitQual(conv_qual, &mtstate->ps);
|
||||
|
||||
conv_tl = (List *) action->targetList;
|
||||
conv_tl = map_partition_varattnos(conv_tl,
|
||||
firstVarno, partrel,
|
||||
firstResultRel, NULL);
|
||||
|
||||
conv_tl = adjust_partition_tlist( conv_tl, map);
|
||||
|
||||
tupDesc = ExecTypeFromTL(conv_tl, partrelDesc->tdhasoid);
|
||||
action_state->tupDesc = tupDesc;
|
||||
|
||||
/* build action projection state */
|
||||
econtext = mtstate->ps.ps_ExprContext;
|
||||
action_state->proj =
|
||||
ExecBuildProjectionInfo(conv_tl, econtext,
|
||||
mtstate->mt_mergeproj,
|
||||
&mtstate->ps,
|
||||
partrelDesc);
|
||||
|
||||
if (action_state->matched)
|
||||
matchedActionStates =
|
||||
lappend(matchedActionStates, action_state);
|
||||
else
|
||||
notMatchedActionStates =
|
||||
lappend(notMatchedActionStates, action_state);
|
||||
}
|
||||
leaf_part_rri->ri_mergeState->matchedActionStates =
|
||||
matchedActionStates;
|
||||
leaf_part_rri->ri_mergeState->notMatchedActionStates =
|
||||
notMatchedActionStates;
|
||||
}
|
||||
|
||||
/*
|
||||
* get_partition_dispatch_recurse() and expand_partitioned_rtentry()
|
||||
* fetch the leaf OIDs in the same order. So we can safely derive the
|
||||
* index of the merge target relation corresponding to this partition
|
||||
* by simply adding partidx + 1 to the root's merge target relation.
|
||||
*/
|
||||
leaf_part_rri->ri_mergeTargetRTI = node->mergeTargetRelation +
|
||||
partidx + 1;
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return leaf_part_rri;
|
||||
|
@ -454,7 +454,7 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
|
||||
{
|
||||
slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
|
||||
&searchslot->tts_tuple->t_self,
|
||||
NULL, slot);
|
||||
NULL, slot, NULL);
|
||||
|
||||
if (slot == NULL) /* "do nothing" */
|
||||
skip_tuple = true;
|
||||
@ -515,7 +515,7 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
|
||||
{
|
||||
skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
|
||||
&searchslot->tts_tuple->t_self,
|
||||
NULL);
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
if (!skip_tuple)
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "commands/trigger.h"
|
||||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeMerge.h"
|
||||
#include "executor/nodeModifyTable.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "miscadmin.h"
|
||||
@ -62,17 +63,17 @@ static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
bool canSetTag,
|
||||
TupleTableSlot **returning);
|
||||
static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
PartitionTupleRouting *proute,
|
||||
ResultRelInfo *targetRelInfo,
|
||||
TupleTableSlot *slot);
|
||||
static ResultRelInfo *getTargetResultRelInfo(ModifyTableState *node);
|
||||
static void ExecSetupChildParentMapForTcs(ModifyTableState *mtstate);
|
||||
static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
|
||||
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
|
||||
int whichplan);
|
||||
|
||||
/* flags for mt_merge_subcommands */
|
||||
#define MERGE_INSERT 0x01
|
||||
#define MERGE_UPDATE 0x02
|
||||
#define MERGE_DELETE 0x04
|
||||
|
||||
/*
|
||||
* Verify that the tuples to be produced by INSERT or UPDATE match the
|
||||
* target relation's rowtype
|
||||
@ -259,11 +260,12 @@ ExecCheckTIDVisible(EState *estate,
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
extern TupleTableSlot *
|
||||
ExecInsert(ModifyTableState *mtstate,
|
||||
TupleTableSlot *slot,
|
||||
TupleTableSlot *planSlot,
|
||||
EState *estate,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -390,9 +392,17 @@ ExecInsert(ModifyTableState *mtstate,
|
||||
* partition, we should instead check UPDATE policies, because we are
|
||||
* executing policies defined on the target table, and not those
|
||||
* defined on the child partitions.
|
||||
*
|
||||
* If we're running MERGE, we refer to the action that we're executing
|
||||
* to know if we're doing an INSERT or UPDATE to a partition table.
|
||||
*/
|
||||
wco_kind = (mtstate->operation == CMD_UPDATE) ?
|
||||
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
|
||||
if (mtstate->operation == CMD_UPDATE)
|
||||
wco_kind = WCO_RLS_UPDATE_CHECK;
|
||||
else if (mtstate->operation == CMD_MERGE)
|
||||
wco_kind = (actionState->commandType == CMD_UPDATE) ?
|
||||
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
|
||||
else
|
||||
wco_kind = WCO_RLS_INSERT_CHECK;
|
||||
|
||||
/*
|
||||
* ExecWithCheckOptions() will skip any WCOs which are not of the kind
|
||||
@ -617,10 +627,19 @@ ExecInsert(ModifyTableState *mtstate,
|
||||
* passed to foreign table triggers; it is NULL when the foreign
|
||||
* table has no relevant triggers.
|
||||
*
|
||||
* MERGE passes actionState of the action it's currently executing;
|
||||
* regular DELETE passes NULL. This is used by ExecDelete to know if it's
|
||||
* being called from MERGE or regular DELETE operation.
|
||||
*
|
||||
* If the DELETE fails because the tuple is concurrently updated/deleted
|
||||
* by this or some other transaction, hufdp is filled with the reason as
|
||||
* well as other important information. Currently only MERGE needs this
|
||||
* information.
|
||||
*
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
TupleTableSlot *
|
||||
ExecDelete(ModifyTableState *mtstate,
|
||||
ItemPointer tupleid,
|
||||
HeapTuple oldtuple,
|
||||
@ -629,6 +648,8 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
bool *tupleDeleted,
|
||||
bool processReturning,
|
||||
HeapUpdateFailureData *hufdp,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
ResultRelInfo *resultRelInfo;
|
||||
@ -641,6 +662,14 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
if (tupleDeleted)
|
||||
*tupleDeleted = false;
|
||||
|
||||
/*
|
||||
* Initialize hufdp. Since the caller is only interested in the failure
|
||||
* status, initialize with the state that is used to indicate successful
|
||||
* operation.
|
||||
*/
|
||||
if (hufdp)
|
||||
hufdp->result = HeapTupleMayBeUpdated;
|
||||
|
||||
/*
|
||||
* get information on the (current) result relation
|
||||
*/
|
||||
@ -654,7 +683,7 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
bool dodelete;
|
||||
|
||||
dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
|
||||
tupleid, oldtuple);
|
||||
tupleid, oldtuple, hufdp);
|
||||
|
||||
if (!dodelete) /* "do nothing" */
|
||||
return NULL;
|
||||
@ -721,6 +750,15 @@ ldelete:;
|
||||
estate->es_crosscheck_snapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd);
|
||||
|
||||
/*
|
||||
* Copy the necessary information, if the caller has asked for it. We
|
||||
* must do this irrespective of whether the tuple was updated or
|
||||
* deleted.
|
||||
*/
|
||||
if (hufdp)
|
||||
*hufdp = hufd;
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case HeapTupleSelfUpdated:
|
||||
@ -755,7 +793,11 @@ ldelete:;
|
||||
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
|
||||
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
|
||||
|
||||
/* Else, already deleted by self; nothing to do */
|
||||
/*
|
||||
* Else, already deleted by self; nothing to do but inform
|
||||
* MERGE about it anyways so that it can take necessary
|
||||
* action.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
case HeapTupleMayBeUpdated:
|
||||
@ -766,14 +808,24 @@ ldelete:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
TupleTableSlot *epqslot;
|
||||
|
||||
/*
|
||||
* If we're executing MERGE, then the onus of running
|
||||
* EvalPlanQual() and handling its outcome lies with the
|
||||
* caller.
|
||||
*/
|
||||
if (actionState != NULL)
|
||||
return NULL;
|
||||
|
||||
/* Normal DELETE path. */
|
||||
epqslot = EvalPlanQual(estate,
|
||||
epqstate,
|
||||
resultRelationDesc,
|
||||
resultRelInfo->ri_RangeTableIndex,
|
||||
GetEPQRangeTableIndex(resultRelInfo),
|
||||
LockTupleExclusive,
|
||||
&hufd.ctid,
|
||||
hufd.xmax);
|
||||
@ -783,7 +835,12 @@ ldelete:;
|
||||
goto ldelete;
|
||||
}
|
||||
}
|
||||
/* tuple already deleted; nothing to do */
|
||||
|
||||
/*
|
||||
* tuple already deleted; nothing to do. But MERGE might want
|
||||
* to handle it differently. We've already filled-in hufdp
|
||||
* with sufficient information for MERGE to look at.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
@ -911,10 +968,21 @@ ldelete:;
|
||||
* foreign table triggers; it is NULL when the foreign table has
|
||||
* no relevant triggers.
|
||||
*
|
||||
* MERGE passes actionState of the action it's currently executing;
|
||||
* regular UPDATE passes NULL. This is used by ExecUpdate to know if it's
|
||||
* being called from MERGE or regular UPDATE operation. ExecUpdate may
|
||||
* pass this information to ExecInsert if it ends up running DELETE+INSERT
|
||||
* for partition key updates.
|
||||
*
|
||||
* If the UPDATE fails because the tuple is concurrently updated/deleted
|
||||
* by this or some other transaction, hufdp is filled with the reason as
|
||||
* well as other important information. Currently only MERGE needs this
|
||||
* information.
|
||||
*
|
||||
* Returns RETURNING result if any, otherwise NULL.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
extern TupleTableSlot *
|
||||
ExecUpdate(ModifyTableState *mtstate,
|
||||
ItemPointer tupleid,
|
||||
HeapTuple oldtuple,
|
||||
@ -922,6 +990,9 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
TupleTableSlot *planSlot,
|
||||
EPQState *epqstate,
|
||||
EState *estate,
|
||||
bool *tuple_updated,
|
||||
HeapUpdateFailureData *hufdp,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
@ -938,6 +1009,17 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
if (IsBootstrapProcessingMode())
|
||||
elog(ERROR, "cannot UPDATE during bootstrap");
|
||||
|
||||
if (tuple_updated)
|
||||
*tuple_updated = false;
|
||||
|
||||
/*
|
||||
* Initialize hufdp. Since the caller is only interested in the failure
|
||||
* status, initialize with the state that is used to indicate successful
|
||||
* operation.
|
||||
*/
|
||||
if (hufdp)
|
||||
hufdp->result = HeapTupleMayBeUpdated;
|
||||
|
||||
/*
|
||||
* get the heap tuple out of the tuple table slot, making sure we have a
|
||||
* writable copy
|
||||
@ -955,7 +1037,7 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
resultRelInfo->ri_TrigDesc->trig_update_before_row)
|
||||
{
|
||||
slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
|
||||
tupleid, oldtuple, slot);
|
||||
tupleid, oldtuple, slot, hufdp);
|
||||
|
||||
if (slot == NULL) /* "do nothing" */
|
||||
return NULL;
|
||||
@ -1001,7 +1083,6 @@ ExecUpdate(ModifyTableState *mtstate,
|
||||
}
|
||||
else
|
||||
{
|
||||
LockTupleMode lockmode;
|
||||
bool partition_constraint_failed;
|
||||
|
||||
/*
|
||||
@ -1079,8 +1160,9 @@ lreplace:;
|
||||
* Row movement, part 1. Delete the tuple, but skip RETURNING
|
||||
* processing. We want to return rows from INSERT.
|
||||
*/
|
||||
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
|
||||
&tuple_deleted, false, false);
|
||||
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
|
||||
estate, &tuple_deleted, false, hufdp, NULL,
|
||||
false);
|
||||
|
||||
/*
|
||||
* For some reason if DELETE didn't happen (e.g. trigger prevented
|
||||
@ -1116,16 +1198,36 @@ lreplace:;
|
||||
saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
|
||||
|
||||
/*
|
||||
* resultRelInfo is one of the per-subplan resultRelInfos. So we
|
||||
* should convert the tuple into root's tuple descriptor, since
|
||||
* ExecInsert() starts the search from root. The tuple conversion
|
||||
* map list is in the order of mtstate->resultRelInfo[], so to
|
||||
* retrieve the one for this resultRel, we need to know the
|
||||
* position of the resultRel in mtstate->resultRelInfo[].
|
||||
* We should convert the tuple into root's tuple descriptor, since
|
||||
* ExecInsert() starts the search from root. To do that, we need to
|
||||
* retrieve the tuple conversion map for this resultRelInfo.
|
||||
*
|
||||
* If we're running MERGE then resultRelInfo is per-partition
|
||||
* resultRelInfo as initialized in ExecInitPartitionInfo(). Note
|
||||
* that we don't expand inheritance for the resultRelation in case
|
||||
* of MERGE and hence there is just one subplan. Whereas for
|
||||
* regular UPDATE, resultRelInfo is one of the per-subplan
|
||||
* resultRelInfos. In either case the position of this partition in
|
||||
* tracked in ri_PartitionLeafIndex;
|
||||
*
|
||||
* Retrieve the map either by looking at the resultRelInfo's
|
||||
* position in mtstate->resultRelInfo[] (for UPDATE) or by simply
|
||||
* using the ri_PartitionLeafIndex value (for MERGE).
|
||||
*/
|
||||
map_index = resultRelInfo - mtstate->resultRelInfo;
|
||||
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
|
||||
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
|
||||
if (mtstate->operation == CMD_MERGE)
|
||||
{
|
||||
map_index = resultRelInfo->ri_PartitionLeafIndex;
|
||||
Assert(mtstate->rootResultRelInfo == NULL);
|
||||
tupconv_map = TupConvMapForLeaf(proute,
|
||||
mtstate->resultRelInfo,
|
||||
map_index);
|
||||
}
|
||||
else
|
||||
{
|
||||
map_index = resultRelInfo - mtstate->resultRelInfo;
|
||||
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
|
||||
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
|
||||
}
|
||||
tuple = ConvertPartitionTupleSlot(tupconv_map,
|
||||
tuple,
|
||||
proute->root_tuple_slot,
|
||||
@ -1135,12 +1237,16 @@ lreplace:;
|
||||
* Prepare for tuple routing, making it look like we're inserting
|
||||
* into the root.
|
||||
*/
|
||||
Assert(mtstate->rootResultRelInfo != NULL);
|
||||
slot = ExecPrepareTupleRouting(mtstate, estate, proute,
|
||||
mtstate->rootResultRelInfo, slot);
|
||||
getTargetResultRelInfo(mtstate),
|
||||
slot);
|
||||
|
||||
ret_slot = ExecInsert(mtstate, slot, planSlot,
|
||||
estate, canSetTag);
|
||||
estate, actionState, canSetTag);
|
||||
|
||||
/* Update is successful. */
|
||||
if (tuple_updated)
|
||||
*tuple_updated = true;
|
||||
|
||||
/* Revert ExecPrepareTupleRouting's node change. */
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
@ -1178,7 +1284,16 @@ lreplace:;
|
||||
estate->es_output_cid,
|
||||
estate->es_crosscheck_snapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd, &lockmode);
|
||||
&hufd);
|
||||
|
||||
/*
|
||||
* Copy the necessary information, if the caller has asked for it. We
|
||||
* must do this irrespective of whether the tuple was updated or
|
||||
* deleted.
|
||||
*/
|
||||
if (hufdp)
|
||||
*hufdp = hufd;
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case HeapTupleSelfUpdated:
|
||||
@ -1223,26 +1338,42 @@ lreplace:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
TupleTableSlot *epqslot;
|
||||
|
||||
/*
|
||||
* If we're executing MERGE, then the onus of running
|
||||
* EvalPlanQual() and handling its outcome lies with the
|
||||
* caller.
|
||||
*/
|
||||
if (actionState != NULL)
|
||||
return NULL;
|
||||
|
||||
/* Regular UPDATE path. */
|
||||
epqslot = EvalPlanQual(estate,
|
||||
epqstate,
|
||||
resultRelationDesc,
|
||||
resultRelInfo->ri_RangeTableIndex,
|
||||
lockmode,
|
||||
GetEPQRangeTableIndex(resultRelInfo),
|
||||
hufd.lockmode,
|
||||
&hufd.ctid,
|
||||
hufd.xmax);
|
||||
if (!TupIsNull(epqslot))
|
||||
{
|
||||
*tupleid = hufd.ctid;
|
||||
/* Normal UPDATE path */
|
||||
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
|
||||
tuple = ExecMaterializeSlot(slot);
|
||||
goto lreplace;
|
||||
}
|
||||
}
|
||||
/* tuple already deleted; nothing to do */
|
||||
|
||||
/*
|
||||
* tuple already deleted; nothing to do. But MERGE might want
|
||||
* to handle it differently. We've already filled-in hufdp
|
||||
* with sufficient information for MERGE to look at.
|
||||
*/
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
@ -1271,6 +1402,9 @@ lreplace:;
|
||||
estate, false, NULL, NIL);
|
||||
}
|
||||
|
||||
if (tuple_updated)
|
||||
*tuple_updated = true;
|
||||
|
||||
if (canSetTag)
|
||||
(estate->es_processed)++;
|
||||
|
||||
@ -1365,9 +1499,9 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
* there's no historical behavior to break.
|
||||
*
|
||||
* It is the user's responsibility to prevent this situation from
|
||||
* occurring. These problems are why SQL-2003 similarly specifies
|
||||
* that for SQL MERGE, an exception must be raised in the event of
|
||||
* an attempt to update the same row twice.
|
||||
* occurring. These problems are why SQL Standard similarly
|
||||
* specifies that for SQL MERGE, an exception must be raised in
|
||||
* the event of an attempt to update the same row twice.
|
||||
*/
|
||||
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple.t_data)))
|
||||
ereport(ERROR,
|
||||
@ -1489,7 +1623,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
*returning = ExecUpdate(mtstate, &tuple.t_self, NULL,
|
||||
mtstate->mt_conflproj, planSlot,
|
||||
&mtstate->mt_epqstate, mtstate->ps.state,
|
||||
canSetTag);
|
||||
NULL, NULL, NULL, canSetTag);
|
||||
|
||||
ReleaseBuffer(buffer);
|
||||
return true;
|
||||
@ -1527,6 +1661,14 @@ fireBSTriggers(ModifyTableState *node)
|
||||
case CMD_DELETE:
|
||||
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
|
||||
break;
|
||||
case CMD_MERGE:
|
||||
if (node->mt_merge_subcommands & MERGE_INSERT)
|
||||
ExecBSInsertTriggers(node->ps.state, resultRelInfo);
|
||||
if (node->mt_merge_subcommands & MERGE_UPDATE)
|
||||
ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
|
||||
if (node->mt_merge_subcommands & MERGE_DELETE)
|
||||
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
@ -1582,6 +1724,17 @@ fireASTriggers(ModifyTableState *node)
|
||||
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
break;
|
||||
case CMD_MERGE:
|
||||
if (node->mt_merge_subcommands & MERGE_DELETE)
|
||||
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
if (node->mt_merge_subcommands & MERGE_UPDATE)
|
||||
ExecASUpdateTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
if (node->mt_merge_subcommands & MERGE_INSERT)
|
||||
ExecASInsertTriggers(node->ps.state, resultRelInfo,
|
||||
node->mt_transition_capture);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
@ -1644,7 +1797,7 @@ ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
|
||||
*
|
||||
* Returns a slot holding the tuple of the partition rowtype.
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
TupleTableSlot *
|
||||
ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
PartitionTupleRouting *proute,
|
||||
@ -1967,6 +2120,7 @@ ExecModifyTable(PlanState *pstate)
|
||||
{
|
||||
/* advance to next subplan if any */
|
||||
node->mt_whichplan++;
|
||||
|
||||
if (node->mt_whichplan < node->mt_nplans)
|
||||
{
|
||||
resultRelInfo++;
|
||||
@ -2015,6 +2169,12 @@ ExecModifyTable(PlanState *pstate)
|
||||
EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
|
||||
slot = planSlot;
|
||||
|
||||
if (operation == CMD_MERGE)
|
||||
{
|
||||
ExecMerge(node, estate, slot, junkfilter, resultRelInfo);
|
||||
continue;
|
||||
}
|
||||
|
||||
tupleid = NULL;
|
||||
oldtuple = NULL;
|
||||
if (junkfilter != NULL)
|
||||
@ -2096,19 +2256,20 @@ ExecModifyTable(PlanState *pstate)
|
||||
slot = ExecPrepareTupleRouting(node, estate, proute,
|
||||
resultRelInfo, slot);
|
||||
slot = ExecInsert(node, slot, planSlot,
|
||||
estate, node->canSetTag);
|
||||
estate, NULL, node->canSetTag);
|
||||
/* Revert ExecPrepareTupleRouting's state change. */
|
||||
if (proute)
|
||||
estate->es_result_relation_info = resultRelInfo;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot,
|
||||
&node->mt_epqstate, estate, node->canSetTag);
|
||||
&node->mt_epqstate, estate,
|
||||
NULL, NULL, NULL, node->canSetTag);
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
slot = ExecDelete(node, tupleid, oldtuple, planSlot,
|
||||
&node->mt_epqstate, estate,
|
||||
NULL, true, node->canSetTag);
|
||||
NULL, true, NULL, NULL, node->canSetTag);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
@ -2198,6 +2359,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
saved_resultRelInfo = estate->es_result_relation_info;
|
||||
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
/*
|
||||
* mergeTargetRelation must be set if we're running MERGE and mustn't be
|
||||
* set if we're not.
|
||||
*/
|
||||
Assert(operation != CMD_MERGE || node->mergeTargetRelation > 0);
|
||||
Assert(operation == CMD_MERGE || node->mergeTargetRelation == 0);
|
||||
|
||||
resultRelInfo->ri_mergeTargetRTI = node->mergeTargetRelation;
|
||||
|
||||
i = 0;
|
||||
foreach(l, node->plans)
|
||||
{
|
||||
@ -2276,7 +2447,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
* partition key.
|
||||
*/
|
||||
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
|
||||
(operation == CMD_INSERT || update_tuple_routing_needed))
|
||||
(operation == CMD_INSERT || operation == CMD_MERGE ||
|
||||
update_tuple_routing_needed))
|
||||
mtstate->mt_partition_tuple_routing =
|
||||
ExecSetupPartitionTupleRouting(mtstate, rel);
|
||||
|
||||
@ -2287,6 +2459,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
|
||||
ExecSetupTransitionCaptureState(mtstate, estate);
|
||||
|
||||
/*
|
||||
* If we are doing MERGE then setup child-parent mapping. This will be
|
||||
* required in case we end up doing a partition-key update, triggering a
|
||||
* tuple routing.
|
||||
*/
|
||||
if (mtstate->operation == CMD_MERGE &&
|
||||
mtstate->mt_partition_tuple_routing != NULL)
|
||||
ExecSetupChildParentMapForLeaf(mtstate->mt_partition_tuple_routing);
|
||||
|
||||
/*
|
||||
* Construct mapping from each of the per-subplan partition attnos to the
|
||||
* root attno. This is required when during update row movement the tuple
|
||||
@ -2478,6 +2659,106 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
}
|
||||
}
|
||||
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
if (node->mergeActionList)
|
||||
{
|
||||
ListCell *l;
|
||||
ExprContext *econtext;
|
||||
List *mergeMatchedActionStates = NIL;
|
||||
List *mergeNotMatchedActionStates = NIL;
|
||||
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
|
||||
|
||||
mtstate->mt_merge_subcommands = 0;
|
||||
|
||||
if (mtstate->ps.ps_ExprContext == NULL)
|
||||
ExecAssignExprContext(estate, &mtstate->ps);
|
||||
|
||||
econtext = mtstate->ps.ps_ExprContext;
|
||||
|
||||
/* initialize slot for the existing tuple */
|
||||
Assert(mtstate->mt_existing == NULL);
|
||||
mtstate->mt_existing =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/* initialize slot for merge actions */
|
||||
Assert(mtstate->mt_mergeproj == NULL);
|
||||
mtstate->mt_mergeproj =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/*
|
||||
* Create a MergeActionState for each action on the mergeActionList
|
||||
* and add it to either a list of matched actions or not-matched
|
||||
* actions.
|
||||
*/
|
||||
foreach(l, node->mergeActionList)
|
||||
{
|
||||
MergeAction *action = (MergeAction *) lfirst(l);
|
||||
MergeActionState *action_state = makeNode(MergeActionState);
|
||||
TupleDesc tupDesc;
|
||||
|
||||
action_state->matched = action->matched;
|
||||
action_state->commandType = action->commandType;
|
||||
action_state->whenqual = ExecInitQual((List *) action->qual,
|
||||
&mtstate->ps);
|
||||
|
||||
/* create target slot for this action's projection */
|
||||
tupDesc = ExecTypeFromTL((List *) action->targetList,
|
||||
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
|
||||
action_state->tupDesc = tupDesc;
|
||||
|
||||
/* build action projection state */
|
||||
action_state->proj =
|
||||
ExecBuildProjectionInfo(action->targetList, econtext,
|
||||
mtstate->mt_mergeproj, &mtstate->ps,
|
||||
resultRelInfo->ri_RelationDesc->rd_att);
|
||||
|
||||
/*
|
||||
* We create two lists - one for WHEN MATCHED actions and one
|
||||
* for WHEN NOT MATCHED actions - and stick the
|
||||
* MergeActionState into the appropriate list.
|
||||
*/
|
||||
if (action_state->matched)
|
||||
mergeMatchedActionStates =
|
||||
lappend(mergeMatchedActionStates, action_state);
|
||||
else
|
||||
mergeNotMatchedActionStates =
|
||||
lappend(mergeNotMatchedActionStates, action_state);
|
||||
|
||||
switch (action->commandType)
|
||||
{
|
||||
case CMD_INSERT:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_INSERT;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
mtstate->mt_merge_subcommands |= MERGE_DELETE;
|
||||
break;
|
||||
case CMD_NOTHING:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
}
|
||||
|
||||
resultRelInfo->ri_mergeState->matchedActionStates =
|
||||
mergeMatchedActionStates;
|
||||
resultRelInfo->ri_mergeState->notMatchedActionStates =
|
||||
mergeNotMatchedActionStates;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* select first subplan */
|
||||
mtstate->mt_whichplan = 0;
|
||||
subplan = (Plan *) linitial(node->plans);
|
||||
@ -2491,7 +2772,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
* --- no need to look first. Typically, this will be a 'ctid' or
|
||||
* 'wholerow' attribute, but in the case of a foreign data wrapper it
|
||||
* might be a set of junk attributes sufficient to identify the remote
|
||||
* row.
|
||||
* row. We follow this logic for MERGE, so it always has a junk attributes.
|
||||
*
|
||||
* If there are multiple result relations, each one needs its own junk
|
||||
* filter. Note multiple rels are only possible for UPDATE/DELETE, so we
|
||||
@ -2519,6 +2800,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
case CMD_DELETE:
|
||||
case CMD_MERGE:
|
||||
junk_filter_needed = true;
|
||||
break;
|
||||
default:
|
||||
@ -2534,6 +2816,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
JunkFilter *j;
|
||||
|
||||
subplan = mtstate->mt_plans[i]->plan;
|
||||
|
||||
if (operation == CMD_INSERT || operation == CMD_UPDATE)
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
subplan->targetlist);
|
||||
@ -2542,7 +2825,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
resultRelInfo->ri_RelationDesc->rd_att->tdhasoid,
|
||||
ExecInitExtraTupleSlot(estate, NULL));
|
||||
|
||||
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||
if (operation == CMD_UPDATE ||
|
||||
operation == CMD_DELETE ||
|
||||
operation == CMD_MERGE)
|
||||
{
|
||||
/* For UPDATE/DELETE, find the appropriate junk attr now */
|
||||
char relkind;
|
||||
@ -2555,6 +2840,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid");
|
||||
if (!AttributeNumberIsValid(j->jf_junkAttNo))
|
||||
elog(ERROR, "could not find junk ctid column");
|
||||
|
||||
if (operation == CMD_MERGE &&
|
||||
relkind == RELKIND_PARTITIONED_TABLE)
|
||||
{
|
||||
j->jf_otherJunkAttNo = ExecFindJunkAttribute(j, "tableoid");
|
||||
if (!AttributeNumberIsValid(j->jf_otherJunkAttNo))
|
||||
elog(ERROR, "could not find junk tableoid column");
|
||||
|
||||
}
|
||||
}
|
||||
else if (relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
|
@ -2420,6 +2420,9 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
|
||||
else
|
||||
res = SPI_OK_UPDATE;
|
||||
break;
|
||||
case CMD_MERGE:
|
||||
res = SPI_OK_MERGE;
|
||||
break;
|
||||
default:
|
||||
return SPI_ERROR_OPUNKNOWN;
|
||||
}
|
||||
|
Reference in New Issue
Block a user