1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Add support for MERGE SQL command

MERGE performs actions that modify rows in the target table using a
source table or query. MERGE provides a single SQL statement that can
conditionally INSERT/UPDATE/DELETE rows -- a task that would otherwise
require multiple PL statements.  For example,

MERGE INTO target AS t
USING source AS s
ON t.tid = s.sid
WHEN MATCHED AND t.balance > s.delta THEN
  UPDATE SET balance = t.balance - s.delta
WHEN MATCHED THEN
  DELETE
WHEN NOT MATCHED AND s.delta > 0 THEN
  INSERT VALUES (s.sid, s.delta)
WHEN NOT MATCHED THEN
  DO NOTHING;

MERGE works with regular tables, partitioned tables and inheritance
hierarchies, including column and row security enforcement, as well as
support for row and statement triggers and transition tables therein.

MERGE is optimized for OLTP and is parameterizable, though also useful
for large scale ETL/ELT. MERGE is not intended to be used in preference
to existing single SQL commands for INSERT, UPDATE or DELETE since there
is some overhead.  MERGE can be used from PL/pgSQL.

MERGE does not support targetting updatable views or foreign tables, and
RETURNING clauses are not allowed either.  These limitations are likely
fixable with sufficient effort.  Rewrite rules are also not supported,
but it's not clear that we'd want to support them.

Author: Pavan Deolasee <pavan.deolasee@gmail.com>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Author: Amit Langote <amitlangote09@gmail.com>
Author: Simon Riggs <simon.riggs@enterprisedb.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com>
Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions)
Reviewed-by: Peter Geoghegan <pg@bowt.ie> (earlier versions)
Reviewed-by: Robert Haas <robertmhaas@gmail.com> (earlier versions)
Reviewed-by: Japin Li <japinli@hotmail.com>
Reviewed-by: Justin Pryzby <pryzby@telsasoft.com>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Discussion: https://postgr.es/m/CANP8+jKitBSrB7oTgT9CY2i1ObfOt36z0XMraQc+Xrz8QB0nXA@mail.gmail.com
Discussion: https://postgr.es/m/CAH2-WzkJdBuxj9PO=2QaO9-3h3xGbQPZ34kJH=HukRekwM-GZg@mail.gmail.com
Discussion: https://postgr.es/m/20201231134736.GA25392@alvherre.pgsql
This commit is contained in:
Alvaro Herrera
2022-03-28 16:45:58 +02:00
parent ae63017bdb
commit 7103ebb7aa
95 changed files with 8726 additions and 167 deletions

View File

@ -39,7 +39,7 @@ columns, combine the values into a new row, and apply the update. (For a
heap table, the row-identity junk column is a CTID, but other things may
be used for other table types.) For DELETE, the plan tree need only deliver
junk row-identity column(s), and the ModifyTable node visits each of those
rows and marks the row deleted.
rows and marks the row deleted. MERGE is described below.
XXX a great deal more documentation needs to be written here...
@ -223,6 +223,45 @@ fast-path step types (EEOP_ASSIGN_*_VAR) to handle targetlist entries that
are simple Vars using only one step instead of two.
MERGE
-----
MERGE is a multiple-table, multiple-action command: It specifies a target
table and a source relation, and can contain multiple WHEN MATCHED and
WHEN NOT MATCHED clauses, each of which specifies one UPDATE, INSERT,
UPDATE, or DO NOTHING actions. The target table is modified by MERGE,
and the source relation supplies additional data for the actions. Each action
optionally specifies a qualifying expression that is evaluated for each tuple.
In the planner, transform_MERGE_to_join constructs a join between the target
table and the source relation, with row-identifying junk columns from the target
table. This join is an outer join if the MERGE command contains any WHEN NOT
MATCHED clauses; the ModifyTable node fetches tuples from the plan tree of that
join. If the row-identifying columns in the fetched tuple are NULL, then the
source relation contains a tuple that is not matched by any tuples in the
target table, so the qualifying expression for each WHEN NOT MATCHED clause is
evaluated given that tuple as returned by the plan. If the expression returns
true, the action indicated by the clause is executed, and no further clauses
are evaluated. On the other hand, if the row-identifying columns are not
NULL, then the matching tuple from the target table can be fetched; qualifying
expression of each WHEN MATCHED clause is evaluated given both the fetched
tuple and the tuple returned by the plan.
If no WHEN NOT MATCHED clauses are present, then the join constructed by
the planner is an inner join, and the row-identifying junk columns are
always non NULL.
If WHEN MATCHED ends up processing a row that is concurrently updated or deleted,
EvalPlanQual (see below) is used to find the latest version of the row, and
that is re-fetched; if it exists, the search for a matching WHEN MATCHED clause
to use starts at the top.
MERGE does not allow its own type of triggers, but instead fires UPDATE, DELETE,
and INSERT triggers: row triggers are fired for each row when an action is
executed for that row. Statement triggers are fired always, regardless of
whether any rows match the corresponding clauses.
Memory Management
-----------------

View File

@ -233,6 +233,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
case CMD_INSERT:
case CMD_DELETE:
case CMD_UPDATE:
case CMD_MERGE:
estate->es_output_cid = GetCurrentCommandId(true);
break;
@ -1244,6 +1245,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_ReturningSlot = NULL;
resultRelInfo->ri_TrigOldSlot = NULL;
resultRelInfo->ri_TrigNewSlot = NULL;
resultRelInfo->ri_matchedMergeAction = NIL;
resultRelInfo->ri_notMatchedMergeAction = NIL;
/*
* Only ExecInitPartitionInfo() and ExecInitPartitionDispatchInfo() pass
@ -2142,6 +2145,19 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
errmsg("new row violates row-level security policy for table \"%s\"",
wco->relname)));
break;
case WCO_RLS_MERGE_UPDATE_CHECK:
case WCO_RLS_MERGE_DELETE_CHECK:
if (wco->polname != NULL)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("target row violates row-level security policy \"%s\" (USING expression) for table \"%s\"",
wco->polname, wco->relname)));
else
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("target row violates row-level security policy (USING expression) for table \"%s\"",
wco->relname)));
break;
case WCO_RLS_CONFLICT_CHECK:
if (wco->polname != NULL)
ereport(ERROR,

View File

@ -20,6 +20,7 @@
#include "catalog/pg_type.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "foreign/fdwapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
@ -182,6 +183,7 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
bool *isnull,
int maxfieldlen);
static List *adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri);
static List *adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap);
static void ExecInitPruningContext(PartitionPruneContext *context,
List *pruning_steps,
PartitionDesc partdesc,
@ -853,6 +855,99 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
lappend(estate->es_tuple_routing_result_relations,
leaf_part_rri);
/*
* Initialize information about this partition that's needed to handle
* MERGE. We take the "first" result relation's mergeActionList as
* reference and make copy for this relation, converting stuff that
* references attribute numbers to match this relation's.
*
* This duplicates much of the logic in ExecInitMerge(), so something
* changes there, look here too.
*/
if (node && node->operation == CMD_MERGE)
{
List *firstMergeActionList = linitial(node->mergeActionLists);
ListCell *lc;
ExprContext *econtext = mtstate->ps.ps_ExprContext;
if (part_attmap == NULL)
part_attmap =
build_attrmap_by_name(RelationGetDescr(partrel),
RelationGetDescr(firstResultRel));
if (unlikely(!leaf_part_rri->ri_projectNewInfoValid))
ExecInitMergeTupleSlots(mtstate, leaf_part_rri);
foreach(lc, firstMergeActionList)
{
/* Make a copy for this relation to be safe. */
MergeAction *action = copyObject(lfirst(lc));
MergeActionState *action_state;
List **list;
/* Generate the action's state for this relation */
action_state = makeNode(MergeActionState);
action_state->mas_action = action;
/* And put the action in the appropriate list */
if (action->matched)
list = &leaf_part_rri->ri_matchedMergeAction;
else
list = &leaf_part_rri->ri_notMatchedMergeAction;
*list = lappend(*list, action_state);
switch (action->commandType)
{
case CMD_INSERT:
/*
* ExecCheckPlanOutput() already done on the targetlist
* when "first" result relation initialized and it is same
* for all result relations.
*/
action_state->mas_proj =
ExecBuildProjectionInfo(action->targetList, econtext,
leaf_part_rri->ri_newTupleSlot,
&mtstate->ps,
RelationGetDescr(partrel));
break;
case CMD_UPDATE:
/*
* Convert updateColnos from "first" result relation
* attribute numbers to this result rel's.
*/
if (part_attmap)
action->updateColnos =
adjust_partition_colnos_using_map(action->updateColnos,
part_attmap);
action_state->mas_proj =
ExecBuildUpdateProjection(action->targetList,
true,
action->updateColnos,
RelationGetDescr(leaf_part_rri->ri_RelationDesc),
econtext,
leaf_part_rri->ri_newTupleSlot,
NULL);
break;
case CMD_DELETE:
break;
default:
elog(ERROR, "unknown action in MERGE WHEN clause");
}
/* found_whole_row intentionally ignored. */
action->qual =
map_variable_attnos(action->qual,
firstVarno, 0,
part_attmap,
RelationGetForm(partrel)->reltype,
&found_whole_row);
action_state->mas_whenqual =
ExecInitQual((List *) action->qual, &mtstate->ps);
}
}
MemoryContextSwitchTo(oldcxt);
return leaf_part_rri;
@ -1433,13 +1528,23 @@ ExecBuildSlotPartitionKeyDescription(Relation rel,
static List *
adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri)
{
List *new_colnos = NIL;
TupleConversionMap *map = ExecGetChildToRootMap(leaf_part_rri);
AttrMap *attrMap;
return adjust_partition_colnos_using_map(colnos, map->attrMap);
}
/*
* adjust_partition_colnos_using_map
* Like adjust_partition_colnos, but uses a caller-supplied map instead
* of assuming to map from the "root" result relation.
*/
static List *
adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap)
{
List *new_colnos = NIL;
ListCell *lc;
Assert(map != NULL); /* else we shouldn't be here */
attrMap = map->attrMap;
Assert(attrMap != NULL); /* else we shouldn't be here */
foreach(lc, colnos)
{

View File

@ -486,7 +486,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
tid, NULL, slot))
tid, NULL, slot, NULL))
skip_tuple = true; /* "do nothing" */
}

File diff suppressed because it is too large Load Diff

View File

@ -2881,6 +2881,9 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
else
res = SPI_OK_UPDATE;
break;
case CMD_MERGE:
res = SPI_OK_MERGE;
break;
default:
return SPI_ERROR_OPUNKNOWN;
}