1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Handle INSERT .. ON CONFLICT with partitioned tables

Commit eb7ed3f306 enabled unique constraints on partitioned tables,
but one thing that was not working properly is INSERT/ON CONFLICT.
This commit introduces a new node keeps state related to the ON CONFLICT
clause per partition, and fills it when that partition is about to be
used for tuple routing.

Author: Amit Langote, Álvaro Herrera
Reviewed-by: Etsuro Fujita, Pavan Deolasee
Discussion: https://postgr.es/m/20180228004602.cwdyralmg5ejdqkq@alvherre.pgsql
This commit is contained in:
Alvaro Herrera
2018-03-26 10:43:54 -03:00
parent 1b89c2188b
commit 555ee77a96
14 changed files with 638 additions and 84 deletions

View File

@ -1347,11 +1347,15 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_FdwRoutine = GetFdwRoutineForRelation(resultRelationDesc, true);
else
resultRelInfo->ri_FdwRoutine = NULL;
/* The following fields are set later if needed */
resultRelInfo->ri_FdwState = NULL;
resultRelInfo->ri_usesFdwDirectModify = false;
resultRelInfo->ri_ConstraintExprs = NULL;
resultRelInfo->ri_junkFilter = NULL;
resultRelInfo->ri_projectReturning = NULL;
resultRelInfo->ri_onConflictArbiterIndexes = NIL;
resultRelInfo->ri_onConflict = NULL;
/*
* Partition constraint, which also includes the partition constraint of

View File

@ -15,10 +15,12 @@
#include "postgres.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_type.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "utils/lsyscache.h"
#include "utils/rls.h"
#include "utils/ruleutils.h"
@ -36,6 +38,8 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
Datum *values,
bool *isnull,
int maxfieldlen);
static List *adjust_partition_tlist(List *tlist, TupleConversionMap *map);
/*
* ExecSetupPartitionTupleRouting - sets up information needed during
@ -64,6 +68,8 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
int num_update_rri = 0,
update_rri_index = 0;
PartitionTupleRouting *proute;
int nparts;
ModifyTable *node = mtstate ? (ModifyTable *) mtstate->ps.plan : NULL;
/*
* Get the information about the partition tree after locking all the
@ -74,20 +80,16 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
proute->partition_dispatch_info =
RelationGetPartitionDispatchInfo(rel, &proute->num_dispatch,
&leaf_parts);
proute->num_partitions = list_length(leaf_parts);
proute->partitions = (ResultRelInfo **) palloc(proute->num_partitions *
sizeof(ResultRelInfo *));
proute->num_partitions = nparts = list_length(leaf_parts);
proute->partitions =
(ResultRelInfo **) palloc(nparts * sizeof(ResultRelInfo *));
proute->parent_child_tupconv_maps =
(TupleConversionMap **) palloc0(proute->num_partitions *
sizeof(TupleConversionMap *));
proute->partition_oids = (Oid *) palloc(proute->num_partitions *
sizeof(Oid));
(TupleConversionMap **) palloc0(nparts * sizeof(TupleConversionMap *));
proute->partition_oids = (Oid *) palloc(nparts * sizeof(Oid));
/* Set up details specific to the type of tuple routing we are doing. */
if (mtstate && mtstate->operation == CMD_UPDATE)
if (node && node->operation == CMD_UPDATE)
{
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
update_rri = mtstate->resultRelInfo;
num_update_rri = list_length(node->plans);
proute->subplan_partition_offsets =
@ -328,7 +330,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
*/
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
leaf_part_rri = (ResultRelInfo *) palloc0(sizeof(ResultRelInfo));
leaf_part_rri = makeNode(ResultRelInfo);
InitResultRelInfo(leaf_part_rri,
partrel,
node ? node->nominalRelation : 1,
@ -475,9 +477,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
&mtstate->ps, RelationGetDescr(partrel));
}
Assert(proute->partitions[partidx] == NULL);
proute->partitions[partidx] = leaf_part_rri;
/*
* Save a tuple conversion map to convert a tuple routed to this partition
* from the parent's type to the partition's.
@ -487,6 +486,145 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
RelationGetDescr(partrel),
gettext_noop("could not convert row type"));
/*
* If there is an ON CONFLICT clause, initialize state for it.
*/
if (node && node->onConflictAction != ONCONFLICT_NONE)
{
TupleConversionMap *map = proute->parent_child_tupconv_maps[partidx];
int firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
Relation firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
TupleDesc partrelDesc = RelationGetDescr(partrel);
ExprContext *econtext = mtstate->ps.ps_ExprContext;
ListCell *lc;
List *arbiterIndexes = NIL;
/*
* If there is a list of arbiter indexes, map it to a list of indexes
* in the partition. We do that by scanning the partition's index
* list and searching for ancestry relationships to each index in the
* ancestor table.
*/
if (list_length(resultRelInfo->ri_onConflictArbiterIndexes) > 0)
{
List *childIdxs;
childIdxs = RelationGetIndexList(leaf_part_rri->ri_RelationDesc);
foreach(lc, childIdxs)
{
Oid childIdx = lfirst_oid(lc);
List *ancestors;
ListCell *lc2;
ancestors = get_partition_ancestors(childIdx);
foreach(lc2, resultRelInfo->ri_onConflictArbiterIndexes)
{
if (list_member_oid(ancestors, lfirst_oid(lc2)))
arbiterIndexes = lappend_oid(arbiterIndexes, childIdx);
}
list_free(ancestors);
}
}
/*
* If the resulting lists are of inequal length, something is wrong.
* (This shouldn't happen, since arbiter index selection should not
* pick up an invalid index.)
*/
if (list_length(resultRelInfo->ri_onConflictArbiterIndexes) !=
list_length(arbiterIndexes))
elog(ERROR, "invalid arbiter index list");
leaf_part_rri->ri_onConflictArbiterIndexes = arbiterIndexes;
/*
* In the DO UPDATE case, we have some more state to initialize.
*/
if (node->onConflictAction == ONCONFLICT_UPDATE)
{
Assert(node->onConflictSet != NIL);
Assert(resultRelInfo->ri_onConflict != NULL);
/*
* If the partition's tuple descriptor matches exactly the root
* parent (the common case), we can simply re-use the parent's ON
* CONFLICT SET state, skipping a bunch of work. Otherwise, we
* need to create state specific to this partition.
*/
if (map == NULL)
leaf_part_rri->ri_onConflict = resultRelInfo->ri_onConflict;
else
{
List *onconflset;
TupleDesc tupDesc;
bool found_whole_row;
leaf_part_rri->ri_onConflict = makeNode(OnConflictSetState);
/*
* Translate expressions in onConflictSet to account for
* different attribute numbers. For that, map partition
* varattnos twice: first to catch the EXCLUDED
* pseudo-relation (INNER_VAR), and second to handle the main
* target relation (firstVarno).
*/
onconflset = (List *) copyObject((Node *) node->onConflictSet);
onconflset =
map_partition_varattnos(onconflset, INNER_VAR, partrel,
firstResultRel, &found_whole_row);
Assert(!found_whole_row);
onconflset =
map_partition_varattnos(onconflset, firstVarno, partrel,
firstResultRel, &found_whole_row);
Assert(!found_whole_row);
/* Finally, adjust this tlist to match the partition. */
onconflset = adjust_partition_tlist(onconflset, map);
/*
* Build UPDATE SET's projection info. The user of this
* projection is responsible for setting the slot's tupdesc!
* We set aside a tupdesc that's good for the common case of a
* partition that's tupdesc-equal to the partitioned table;
* partitions of different tupdescs must generate their own.
*/
tupDesc = ExecTypeFromTL(onconflset, partrelDesc->tdhasoid);
ExecSetSlotDescriptor(mtstate->mt_conflproj, tupDesc);
leaf_part_rri->ri_onConflict->oc_ProjInfo =
ExecBuildProjectionInfo(onconflset, econtext,
mtstate->mt_conflproj,
&mtstate->ps, partrelDesc);
leaf_part_rri->ri_onConflict->oc_ProjTupdesc = tupDesc;
/*
* If there is a WHERE clause, initialize state where it will
* be evaluated, mapping the attribute numbers appropriately.
* As with onConflictSet, we need to map partition varattnos
* to the partition's tupdesc.
*/
if (node->onConflictWhere)
{
List *clause;
clause = copyObject((List *) node->onConflictWhere);
clause = map_partition_varattnos(clause, INNER_VAR,
partrel, firstResultRel,
&found_whole_row);
Assert(!found_whole_row);
clause = map_partition_varattnos(clause, firstVarno,
partrel, firstResultRel,
&found_whole_row);
Assert(!found_whole_row);
leaf_part_rri->ri_onConflict->oc_WhereClause =
ExecInitQual((List *) clause, &mtstate->ps);
}
}
}
}
Assert(proute->partitions[partidx] == NULL);
proute->partitions[partidx] = leaf_part_rri;
MemoryContextSwitchTo(oldContext);
return leaf_part_rri;
@ -946,3 +1084,70 @@ ExecBuildSlotPartitionKeyDescription(Relation rel,
return buf.data;
}
/*
* adjust_partition_tlist
* Adjust the targetlist entries for a given partition to account for
* attribute differences between parent and the partition
*
* The expressions have already been fixed, but here we fix the list to make
* target resnos match the partition's attribute numbers. This results in a
* copy of the original target list in which the entries appear in resno
* order, including both the existing entries (that may have their resno
* changed in-place) and the newly added entries for columns that don't exist
* in the parent.
*
* Scribbles on the input tlist, so callers must make sure to make a copy
* before passing it to us.
*/
static List *
adjust_partition_tlist(List *tlist, TupleConversionMap *map)
{
List *new_tlist = NIL;
TupleDesc tupdesc = map->outdesc;
AttrNumber *attrMap = map->attrMap;
AttrNumber attrno;
for (attrno = 1; attrno <= tupdesc->natts; attrno++)
{
Form_pg_attribute att_tup = TupleDescAttr(tupdesc, attrno - 1);
TargetEntry *tle;
if (attrMap[attrno - 1] != InvalidAttrNumber)
{
Assert(!att_tup->attisdropped);
/*
* Use the corresponding entry from the parent's tlist, adjusting
* the resno the match the partition's attno.
*/
tle = (TargetEntry *) list_nth(tlist, attrMap[attrno - 1] - 1);
tle->resno = attrno;
}
else
{
Const *expr;
/*
* For a dropped attribute in the partition, generate a dummy
* entry with resno matching the partition's attno.
*/
Assert(att_tup->attisdropped);
expr = makeConst(INT4OID,
-1,
InvalidOid,
sizeof(int32),
(Datum) 0,
true, /* isnull */
true /* byval */ );
tle = makeTargetEntry((Expr *) expr,
attrno,
pstrdup(NameStr(att_tup->attname)),
false);
}
new_tlist = lappend(new_tlist, tle);
}
return new_tlist;
}

View File

@ -422,7 +422,7 @@ ExecInsert(ModifyTableState *mtstate,
bool specConflict;
List *arbiterIndexes;
arbiterIndexes = node->arbiterIndexes;
arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes;
/*
* Do a non-conclusive check for conflicts first.
@ -1055,6 +1055,18 @@ lreplace:;
int map_index;
TupleConversionMap *tupconv_map;
/*
* Disallow an INSERT ON CONFLICT DO UPDATE that causes the
* original row to migrate to a different partition. Maybe this
* can be implemented some day, but it seems a fringe feature with
* little redeeming value.
*/
if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid ON UPDATE specification"),
errdetail("The result tuple would appear in a different partition than the original tuple.")));
/*
* When an UPDATE is run on a leaf partition, we will not have
* partition tuple routing set up. In that case, fail with
@ -1313,7 +1325,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
{
ExprContext *econtext = mtstate->ps.ps_ExprContext;
Relation relation = resultRelInfo->ri_RelationDesc;
ExprState *onConflictSetWhere = resultRelInfo->ri_onConflictSetWhere;
ExprState *onConflictSetWhere = resultRelInfo->ri_onConflict->oc_WhereClause;
HeapTupleData tuple;
HeapUpdateFailureData hufd;
LockTupleMode lockmode;
@ -1462,7 +1474,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
}
/* Project the new tuple version */
ExecProject(resultRelInfo->ri_onConflictSetProj);
ExecProject(resultRelInfo->ri_onConflict->oc_ProjInfo);
/*
* Note that it is possible that the target tuple has been modified in
@ -1639,6 +1651,7 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
ResultRelInfo *targetRelInfo,
TupleTableSlot *slot)
{
ModifyTable *node;
int partidx;
ResultRelInfo *partrel;
HeapTuple tuple;
@ -1720,6 +1733,19 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
proute->partition_tuple_slot,
&slot);
/* Initialize information needed to handle ON CONFLICT DO UPDATE. */
Assert(mtstate != NULL);
node = (ModifyTable *) mtstate->ps.plan;
if (node->onConflictAction == ONCONFLICT_UPDATE)
{
Assert(mtstate->mt_existing != NULL);
ExecSetSlotDescriptor(mtstate->mt_existing,
RelationGetDescr(partrel->ri_RelationDesc));
Assert(mtstate->mt_conflproj != NULL);
ExecSetSlotDescriptor(mtstate->mt_conflproj,
partrel->ri_onConflict->oc_ProjTupdesc);
}
return slot;
}
@ -2347,11 +2373,15 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->ps.ps_ExprContext = NULL;
}
/* Set the list of arbiter indexes if needed for ON CONFLICT */
resultRelInfo = mtstate->resultRelInfo;
if (node->onConflictAction != ONCONFLICT_NONE)
resultRelInfo->ri_onConflictArbiterIndexes = node->arbiterIndexes;
/*
* If needed, Initialize target list, projection and qual for ON CONFLICT
* DO UPDATE.
*/
resultRelInfo = mtstate->resultRelInfo;
if (node->onConflictAction == ONCONFLICT_UPDATE)
{
ExprContext *econtext;
@ -2368,34 +2398,54 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
econtext = mtstate->ps.ps_ExprContext;
relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
/* initialize slot for the existing tuple */
/*
* Initialize slot for the existing tuple. If we'll be performing
* tuple routing, the tuple descriptor to use for this will be
* determined based on which relation the update is actually applied
* to, so we don't set its tuple descriptor here.
*/
mtstate->mt_existing =
ExecInitExtraTupleSlot(mtstate->ps.state, relationDesc);
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/* carried forward solely for the benefit of explain */
mtstate->mt_excludedtlist = node->exclRelTlist;
/* create target slot for UPDATE SET projection */
/* create state for DO UPDATE SET operation */
resultRelInfo->ri_onConflict = makeNode(OnConflictSetState);
/*
* Create the tuple slot for the UPDATE SET projection.
*
* Just like mt_existing above, we leave it without a tuple descriptor
* in the case of partitioning tuple routing, so that it can be
* changed by ExecPrepareTupleRouting. In that case, we still save
* the tupdesc in the parent's state: it can be reused by partitions
* with an identical descriptor to the parent.
*/
tupDesc = ExecTypeFromTL((List *) node->onConflictSet,
relationDesc->tdhasoid);
mtstate->mt_conflproj =
ExecInitExtraTupleSlot(mtstate->ps.state, tupDesc);
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : tupDesc);
resultRelInfo->ri_onConflict->oc_ProjTupdesc = tupDesc;
/* build UPDATE SET projection state */
resultRelInfo->ri_onConflictSetProj =
resultRelInfo->ri_onConflict->oc_ProjInfo =
ExecBuildProjectionInfo(node->onConflictSet, econtext,
mtstate->mt_conflproj, &mtstate->ps,
relationDesc);
/* build DO UPDATE WHERE clause expression */
/* initialize state to evaluate the WHERE clause, if any */
if (node->onConflictWhere)
{
ExprState *qualexpr;
qualexpr = ExecInitQual((List *) node->onConflictWhere,
&mtstate->ps);
resultRelInfo->ri_onConflictSetWhere = qualexpr;
resultRelInfo->ri_onConflict->oc_WhereClause = qualexpr;
}
}