1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-02 04:21:28 +03:00

Add RETURNING support to MERGE.

This allows a RETURNING clause to be appended to a MERGE query, to
return values based on each row inserted, updated, or deleted. As with
plain INSERT, UPDATE, and DELETE commands, the returned values are
based on the new contents of the target table for INSERT and UPDATE
actions, and on its old contents for DELETE actions. Values from the
source relation may also be returned.

As with INSERT/UPDATE/DELETE, the output of MERGE ... RETURNING may be
used as the source relation for other operations such as WITH queries
and COPY commands.

Additionally, a special function merge_action() is provided, which
returns 'INSERT', 'UPDATE', or 'DELETE', depending on the action
executed for each row. The merge_action() function can be used
anywhere in the RETURNING list, including in arbitrary expressions and
subqueries, but it is an error to use it anywhere outside of a MERGE
query's RETURNING list.

Dean Rasheed, reviewed by Isaac Morland, Vik Fearing, Alvaro Herrera,
Gurjeet Singh, Jian He, Jeff Davis, Merlin Moncure, Peter Eisentraut,
and Wolfgang Walther.

Discussion: http://postgr.es/m/CAEZATCWePEGQR5LBn-vD6SfeLZafzEm2Qy_L_Oky2=qw2w3Pzg@mail.gmail.com
This commit is contained in:
Dean Rasheed
2024-03-17 13:58:59 +00:00
parent 6a004f1be8
commit c649fa24a4
61 changed files with 1198 additions and 216 deletions

View File

@@ -1107,6 +1107,19 @@ ExecInitExprRec(Expr *node, ExprState *state,
break;
}
case T_MergeSupportFunc:
{
/* must be in a MERGE, else something messed up */
if (!state->parent ||
!IsA(state->parent, ModifyTableState) ||
((ModifyTableState *) state->parent)->operation != CMD_MERGE)
elog(ERROR, "MergeSupportFunc found in non-merge plan node");
scratch.opcode = EEOP_MERGE_SUPPORT_FUNC;
ExprEvalPushStep(state, &scratch);
break;
}
case T_SubscriptingRef:
{
SubscriptingRef *sbsref = (SubscriptingRef *) node;

View File

@@ -484,6 +484,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_AGGREF,
&&CASE_EEOP_GROUPING_FUNC,
&&CASE_EEOP_WINDOW_FUNC,
&&CASE_EEOP_MERGE_SUPPORT_FUNC,
&&CASE_EEOP_SUBPLAN,
&&CASE_EEOP_AGG_STRICT_DESERIALIZE,
&&CASE_EEOP_AGG_DESERIALIZE,
@@ -1592,6 +1593,14 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_NEXT();
}
EEO_CASE(EEOP_MERGE_SUPPORT_FUNC)
{
/* too complex/uncommon for an inline implementation */
ExecEvalMergeSupportFunc(state, op, econtext);
EEO_NEXT();
}
EEO_CASE(EEOP_SUBPLAN)
{
/* too complex for an inline implementation */
@@ -4245,6 +4254,45 @@ ExecEvalGroupingFunc(ExprState *state, ExprEvalStep *op)
*op->resnull = false;
}
/*
* ExecEvalMergeSupportFunc
*
* Returns information about the current MERGE action for its RETURNING list.
*/
void
ExecEvalMergeSupportFunc(ExprState *state, ExprEvalStep *op,
ExprContext *econtext)
{
ModifyTableState *mtstate = castNode(ModifyTableState, state->parent);
MergeActionState *relaction = mtstate->mt_merge_action;
if (!relaction)
elog(ERROR, "no merge action in progress");
/* Return the MERGE action ("INSERT", "UPDATE", or "DELETE") */
switch (relaction->mas_action->commandType)
{
case CMD_INSERT:
*op->resvalue = PointerGetDatum(cstring_to_text_with_len("INSERT", 6));
*op->resnull = false;
break;
case CMD_UPDATE:
*op->resvalue = PointerGetDatum(cstring_to_text_with_len("UPDATE", 6));
*op->resnull = false;
break;
case CMD_DELETE:
*op->resvalue = PointerGetDatum(cstring_to_text_with_len("DELETE", 6));
*op->resnull = false;
break;
case CMD_NOTHING:
elog(ERROR, "unexpected merge action: DO NOTHING");
break;
default:
elog(ERROR, "unrecognized commandType: %d",
(int) relaction->mas_action->commandType);
}
}
/*
* Hand off evaluation of a subplan to nodeSubplan.c
*/

View File

@@ -609,8 +609,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
* Build the RETURNING projection for the partition. Note that we didn't
* build the returningList for partitions within the planner, but simple
* translation of varattnos will suffice. This only occurs for the INSERT
* case or in the case of UPDATE tuple routing where we didn't find a
* result rel to reuse.
* case or in the case of UPDATE/MERGE tuple routing where we didn't find
* a result rel to reuse.
*/
if (node && node->returningLists != NIL)
{
@@ -619,11 +619,13 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
List *returningList;
/* See the comment above for WCO lists. */
/* (except no RETURNING support for MERGE yet) */
Assert((node->operation == CMD_INSERT &&
list_length(node->returningLists) == 1 &&
list_length(node->resultRelations) == 1) ||
(node->operation == CMD_UPDATE &&
list_length(node->returningLists) ==
list_length(node->resultRelations)) ||
(node->operation == CMD_MERGE &&
list_length(node->returningLists) ==
list_length(node->resultRelations)));

View File

@@ -1662,8 +1662,8 @@ check_sql_fn_retval(List *queryTreeLists,
/*
* If it's a plain SELECT, it returns whatever the targetlist says.
* Otherwise, if it's INSERT/UPDATE/DELETE with RETURNING, it returns
* that. Otherwise, the function return type must be VOID.
* Otherwise, if it's INSERT/UPDATE/DELETE/MERGE with RETURNING, it
* returns that. Otherwise, the function return type must be VOID.
*
* Note: eventually replace this test with QueryReturnsTuples? We'd need
* a more general method of determining the output type, though. Also, it
@@ -1681,7 +1681,8 @@ check_sql_fn_retval(List *queryTreeLists,
else if (parse &&
(parse->commandType == CMD_INSERT ||
parse->commandType == CMD_UPDATE ||
parse->commandType == CMD_DELETE) &&
parse->commandType == CMD_DELETE ||
parse->commandType == CMD_MERGE) &&
parse->returningList)
{
tlist = parse->returningList;
@@ -1695,7 +1696,7 @@ check_sql_fn_retval(List *queryTreeLists,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("return type mismatch in function declared to return %s",
format_type_be(rettype)),
errdetail("Function's final statement must be SELECT or INSERT/UPDATE/DELETE RETURNING.")));
errdetail("Function's final statement must be SELECT or INSERT/UPDATE/DELETE/MERGE RETURNING.")));
return false; /* keep compiler quiet */
}

View File

@@ -36,8 +36,7 @@
* RETURNING tuple after completing each row insert, update, or delete.
* It must be called again to continue the operation. Without RETURNING,
* we just loop within the node until all the work is done, then
* return NULL. This avoids useless call/return overhead. (MERGE does
* not support RETURNING.)
* return NULL. This avoids useless call/return overhead.
*/
#include "postgres.h"
@@ -85,9 +84,6 @@ typedef struct ModifyTableContext
*/
TupleTableSlot *planSlot;
/* MERGE specific */
MergeActionState *relaction; /* MERGE action in progress */
/*
* Information about the changes that were made concurrently to a tuple
* being updated or deleted
@@ -150,14 +146,15 @@ static TupleTableSlot *ExecMerge(ModifyTableContext *context,
HeapTuple oldtuple,
bool canSetTag);
static void ExecInitMerge(ModifyTableState *mtstate, EState *estate);
static bool ExecMergeMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
ItemPointer tupleid,
HeapTuple oldtuple,
bool canSetTag);
static void ExecMergeNotMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
bool canSetTag);
static TupleTableSlot *ExecMergeMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
ItemPointer tupleid,
HeapTuple oldtuple,
bool canSetTag,
bool *matched);
static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
bool canSetTag);
/*
@@ -977,7 +974,7 @@ ExecInsert(ModifyTableContext *context,
if (mtstate->operation == CMD_UPDATE)
wco_kind = WCO_RLS_UPDATE_CHECK;
else if (mtstate->operation == CMD_MERGE)
wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ?
wco_kind = (mtstate->mt_merge_action->mas_action->commandType == CMD_UPDATE) ?
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
else
wco_kind = WCO_RLS_INSERT_CHECK;
@@ -1831,7 +1828,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
* additional rechecking, and might end up executing a different
* action entirely).
*/
if (context->relaction != NULL)
if (mtstate->operation == CMD_MERGE)
return *tmresult == TM_Ok;
else if (TupIsNull(epqslot))
return true;
@@ -2072,7 +2069,7 @@ lreplace:
* No luck, a retry is needed. If running MERGE, we do not do so
* here; instead let it handle that on its own rules.
*/
if (context->relaction != NULL)
if (context->mtstate->operation == CMD_MERGE)
return result;
/*
@@ -2713,6 +2710,7 @@ static TupleTableSlot *
ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag)
{
TupleTableSlot *rslot = NULL;
bool matched;
/*-----
@@ -2761,19 +2759,18 @@ ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
*/
matched = tupleid != NULL || oldtuple != NULL;
if (matched)
matched = ExecMergeMatched(context, resultRelInfo, tupleid, oldtuple,
canSetTag);
rslot = ExecMergeMatched(context, resultRelInfo, tupleid, oldtuple,
canSetTag, &matched);
/*
* Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched()
* returned "false", indicating the previously MATCHED tuple no longer
* matches.
* Deal with the NOT MATCHED case (either a NOT MATCHED tuple from the
* join, or a previously MATCHED tuple for which ExecMergeMatched() set
* "matched" to false, indicating that it no longer matches).
*/
if (!matched)
ExecMergeNotMatched(context, resultRelInfo, canSetTag);
rslot = ExecMergeNotMatched(context, resultRelInfo, canSetTag);
/* No RETURNING support yet */
return NULL;
return rslot;
}
/*
@@ -2785,8 +2782,8 @@ ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
* We start from the first WHEN MATCHED action and check if the WHEN quals
* pass, if any. If the WHEN quals for the first action do not pass, we
* check the second, then the third and so on. If we reach to the end, no
* action is taken and we return true, indicating that no further action is
* required for this tuple.
* action is taken and "matched" is set to true, indicating that no further
* action is required for this tuple.
*
* If we do find a qualifying action, then we attempt to execute the action.
*
@@ -2795,16 +2792,18 @@ ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
* with individual actions are evaluated by this routine via ExecQual, while
* EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the
* updated tuple still passes the join quals, then we restart from the first
* action to look for a qualifying action. Otherwise, we return false --
* meaning that a NOT MATCHED action must now be executed for the current
* source tuple.
* action to look for a qualifying action. Otherwise, "matched" is set to
* false -- meaning that a NOT MATCHED action must now be executed for the
* current source tuple.
*/
static bool
static TupleTableSlot *
ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag)
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag,
bool *matched)
{
ModifyTableState *mtstate = context->mtstate;
TupleTableSlot *newslot;
TupleTableSlot *newslot = NULL;
TupleTableSlot *rslot = NULL;
EState *estate = context->estate;
ExprContext *econtext = mtstate->ps.ps_ExprContext;
bool isNull;
@@ -2815,7 +2814,10 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
* If there are no WHEN MATCHED actions, we are done.
*/
if (resultRelInfo->ri_matchedMergeAction == NIL)
return true;
{
*matched = true;
return NULL;
}
/*
* Make tuple and any needed join variables available to ExecQual and
@@ -2905,12 +2907,15 @@ lmerge_matched:
*/
newslot = ExecProject(relaction->mas_proj);
context->relaction = relaction;
mtstate->mt_merge_action = relaction;
if (!ExecUpdatePrologue(context, resultRelInfo,
tupleid, NULL, newslot, &result))
{
if (result == TM_Ok)
return true; /* "do nothing" */
{
*matched = true;
return NULL; /* "do nothing" */
}
break; /* concurrent update/delete */
}
@@ -2920,7 +2925,10 @@ lmerge_matched:
{
if (!ExecIRUpdateTriggers(estate, resultRelInfo,
oldtuple, newslot))
return true; /* "do nothing" */
{
*matched = true;
return NULL; /* "do nothing" */
}
}
else
{
@@ -2933,12 +2941,15 @@ lmerge_matched:
* cross-partition update was done, then there's nothing
* else for us to do --- the UPDATE has been turned into a
* DELETE and an INSERT, and we must not perform any of
* the usual post-update tasks.
* the usual post-update tasks. Also, the RETURNING tuple
* (if any) has been projected, so we can just return
* that.
*/
if (updateCxt.crossPartUpdate)
{
mtstate->mt_merge_updated += 1;
return true;
*matched = true;
return context->cpUpdateReturningSlot;
}
}
@@ -2951,12 +2962,15 @@ lmerge_matched:
break;
case CMD_DELETE:
context->relaction = relaction;
mtstate->mt_merge_action = relaction;
if (!ExecDeletePrologue(context, resultRelInfo, tupleid,
NULL, NULL, &result))
{
if (result == TM_Ok)
return true; /* "do nothing" */
{
*matched = true;
return NULL; /* "do nothing" */
}
break; /* concurrent update/delete */
}
@@ -2966,7 +2980,10 @@ lmerge_matched:
{
if (!ExecIRDeleteTriggers(estate, resultRelInfo,
oldtuple))
return true; /* "do nothing" */
{
*matched = true;
return NULL; /* "do nothing" */
}
}
else
result = ExecDeleteAct(context, resultRelInfo, tupleid,
@@ -3046,7 +3063,8 @@ lmerge_matched:
* If the tuple was already deleted, return to let caller
* handle it under NOT MATCHED clauses.
*/
return false;
*matched = false;
return NULL;
case TM_Updated:
{
@@ -3092,13 +3110,19 @@ lmerge_matched:
* NOT MATCHED actions.
*/
if (TupIsNull(epqslot))
return false;
{
*matched = false;
return NULL;
}
(void) ExecGetJunkAttribute(epqslot,
resultRelInfo->ri_RowIdAttNo,
&isNull);
if (isNull)
return false;
{
*matched = false;
return NULL;
}
/*
* When a tuple was updated and migrated to
@@ -3133,7 +3157,8 @@ lmerge_matched:
* tuple already deleted; tell caller to run NOT
* MATCHED actions
*/
return false;
*matched = false;
return NULL;
case TM_SelfModified:
@@ -3161,13 +3186,13 @@ lmerge_matched:
/* This shouldn't happen */
elog(ERROR, "attempted to update or delete invisible tuple");
return false;
return NULL;
default:
/* see table_tuple_lock call in ExecDelete() */
elog(ERROR, "unexpected table_tuple_lock status: %u",
result);
return false;
return NULL;
}
}
@@ -3179,6 +3204,31 @@ lmerge_matched:
break;
}
/* Process RETURNING if present */
if (resultRelInfo->ri_projectReturning)
{
switch (commandType)
{
case CMD_UPDATE:
rslot = ExecProcessReturning(resultRelInfo, newslot,
context->planSlot);
break;
case CMD_DELETE:
rslot = ExecProcessReturning(resultRelInfo,
resultRelInfo->ri_oldTupleSlot,
context->planSlot);
break;
case CMD_NOTHING:
break;
default:
elog(ERROR, "unrecognized commandType: %d",
(int) commandType);
}
}
/*
* We've activated one of the WHEN clauses, so we don't search
* further. This is required behaviour, not an optimization.
@@ -3189,19 +3239,22 @@ lmerge_matched:
/*
* Successfully executed an action or no qualifying action was found.
*/
return true;
*matched = true;
return rslot;
}
/*
* Execute the first qualifying NOT MATCHED action.
*/
static void
static TupleTableSlot *
ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
bool canSetTag)
{
ModifyTableState *mtstate = context->mtstate;
ExprContext *econtext = mtstate->ps.ps_ExprContext;
List *actionStates = NIL;
TupleTableSlot *rslot = NULL;
ListCell *l;
/*
@@ -3251,10 +3304,10 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
* so we don't need to map the tuple here.
*/
newslot = ExecProject(action->mas_proj);
context->relaction = action;
mtstate->mt_merge_action = action;
(void) ExecInsert(context, mtstate->rootResultRelInfo, newslot,
canSetTag, NULL, NULL);
rslot = ExecInsert(context, mtstate->rootResultRelInfo,
newslot, canSetTag, NULL, NULL);
mtstate->mt_merge_inserted += 1;
break;
case CMD_NOTHING:
@@ -3270,6 +3323,8 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
*/
break;
}
return rslot;
}
/*
@@ -3732,9 +3787,17 @@ ExecModifyTable(PlanState *pstate)
{
EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot);
ExecMerge(&context, node->resultRelInfo, NULL, NULL,
node->canSetTag);
continue; /* no RETURNING support yet */
slot = ExecMerge(&context, node->resultRelInfo,
NULL, NULL, node->canSetTag);
/*
* If we got a RETURNING result, return it to the caller.
* We'll continue the work on next call.
*/
if (slot)
return slot;
continue; /* continue with the next tuple */
}
elog(ERROR, "tableoid is NULL");
@@ -3811,9 +3874,17 @@ ExecModifyTable(PlanState *pstate)
{
EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot);
ExecMerge(&context, node->resultRelInfo, NULL, NULL,
node->canSetTag);
continue; /* no RETURNING support yet */
slot = ExecMerge(&context, node->resultRelInfo,
NULL, NULL, node->canSetTag);
/*
* If we got a RETURNING result, return it to the
* caller. We'll continue the work on next call.
*/
if (slot)
return slot;
continue; /* continue with the next tuple */
}
elog(ERROR, "ctid is NULL");
@@ -3860,9 +3931,17 @@ ExecModifyTable(PlanState *pstate)
{
EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot);
ExecMerge(&context, node->resultRelInfo, NULL, NULL,
node->canSetTag);
continue; /* no RETURNING support yet */
slot = ExecMerge(&context, node->resultRelInfo,
NULL, NULL, node->canSetTag);
/*
* If we got a RETURNING result, return it to the
* caller. We'll continue the work on next call.
*/
if (slot)
return slot;
continue; /* continue with the next tuple */
}
elog(ERROR, "wholerow is NULL");
@@ -3924,7 +4003,6 @@ ExecModifyTable(PlanState *pstate)
}
slot = ExecGetUpdateNewTuple(resultRelInfo, context.planSlot,
oldSlot);
context.relaction = NULL;
/* Now apply the update. */
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,

View File

@@ -2032,6 +2032,8 @@ SPI_result_code_string(int code)
return "SPI_OK_TD_REGISTER";
case SPI_OK_MERGE:
return "SPI_OK_MERGE";
case SPI_OK_MERGE_RETURNING:
return "SPI_OK_MERGE_RETURNING";
}
/* Unrecognized code ... return something useful ... */
sprintf(buf, "Unrecognized SPI code %d", code);
@@ -2885,7 +2887,10 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
res = SPI_OK_UPDATE;
break;
case CMD_MERGE:
res = SPI_OK_MERGE;
if (queryDesc->plannedstmt->hasReturning)
res = SPI_OK_MERGE_RETURNING;
else
res = SPI_OK_MERGE;
break;
default:
return SPI_ERROR_OPUNKNOWN;