mirror of
https://github.com/postgres/postgres.git
synced 2025-11-09 06:21:09 +03:00
Allow insert and update tuple routing and COPY for foreign tables.
Also enable this for postgres_fdw. Etsuro Fujita, based on an earlier patch by Amit Langote. The larger patch series of which this is a part has been reviewed by Amit Langote, David Fetter, Maksim Milyutin, Álvaro Herrera, Stephen Frost, and me. Minor documentation changes to the final version by me. Discussion: http://postgr.es/m/29906a26-da12-8c86-4fb9-d8f88442f2b9@lab.ntt.co.jp
This commit is contained in:
@@ -31,6 +31,7 @@
|
||||
#include "commands/trigger.h"
|
||||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
@@ -2302,6 +2303,7 @@ CopyFrom(CopyState cstate)
|
||||
ResultRelInfo *resultRelInfo;
|
||||
ResultRelInfo *saved_resultRelInfo = NULL;
|
||||
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
|
||||
ModifyTableState *mtstate;
|
||||
ExprContext *econtext;
|
||||
TupleTableSlot *myslot;
|
||||
MemoryContext oldcontext = CurrentMemoryContext;
|
||||
@@ -2323,11 +2325,12 @@ CopyFrom(CopyState cstate)
|
||||
Assert(cstate->rel);
|
||||
|
||||
/*
|
||||
* The target must be a plain relation or have an INSTEAD OF INSERT row
|
||||
* trigger. (Currently, such triggers are only allowed on views, so we
|
||||
* only hint about them in the view case.)
|
||||
* The target must be a plain, foreign, or partitioned relation, or have
|
||||
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
|
||||
* allowed on views, so we only hint about them in the view case.)
|
||||
*/
|
||||
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
|
||||
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
|
||||
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
|
||||
!(cstate->rel->trigdesc &&
|
||||
cstate->rel->trigdesc->trig_insert_instead_row))
|
||||
@@ -2343,11 +2346,6 @@ CopyFrom(CopyState cstate)
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("cannot copy to materialized view \"%s\"",
|
||||
RelationGetRelationName(cstate->rel))));
|
||||
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("cannot copy to foreign table \"%s\"",
|
||||
RelationGetRelationName(cstate->rel))));
|
||||
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
@@ -2454,6 +2452,9 @@ CopyFrom(CopyState cstate)
|
||||
NULL,
|
||||
0);
|
||||
|
||||
/* Verify the named relation is a valid target for INSERT */
|
||||
CheckValidResultRel(resultRelInfo, CMD_INSERT);
|
||||
|
||||
ExecOpenIndices(resultRelInfo, false);
|
||||
|
||||
estate->es_result_relations = resultRelInfo;
|
||||
@@ -2466,6 +2467,21 @@ CopyFrom(CopyState cstate)
|
||||
/* Triggers might need a slot as well */
|
||||
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
|
||||
|
||||
/*
|
||||
* Set up a ModifyTableState so we can let FDW(s) init themselves for
|
||||
* foreign-table result relation(s).
|
||||
*/
|
||||
mtstate = makeNode(ModifyTableState);
|
||||
mtstate->ps.plan = NULL;
|
||||
mtstate->ps.state = estate;
|
||||
mtstate->operation = CMD_INSERT;
|
||||
mtstate->resultRelInfo = estate->es_result_relations;
|
||||
|
||||
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
||||
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
||||
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
|
||||
resultRelInfo);
|
||||
|
||||
/* Prepare to catch AFTER triggers. */
|
||||
AfterTriggerBeginQuery();
|
||||
|
||||
@@ -2507,11 +2523,12 @@ CopyFrom(CopyState cstate)
|
||||
* expressions. Such triggers or expressions might query the table we're
|
||||
* inserting to, and act differently if the tuples that have already been
|
||||
* processed and prepared for insertion are not there. We also can't do
|
||||
* it if the table is partitioned.
|
||||
* it if the table is foreign or partitioned.
|
||||
*/
|
||||
if ((resultRelInfo->ri_TrigDesc != NULL &&
|
||||
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
|
||||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
|
||||
resultRelInfo->ri_FdwRoutine != NULL ||
|
||||
cstate->partition_tuple_routing != NULL ||
|
||||
cstate->volatile_defexprs)
|
||||
{
|
||||
@@ -2626,19 +2643,13 @@ CopyFrom(CopyState cstate)
|
||||
resultRelInfo = proute->partitions[leaf_part_index];
|
||||
if (resultRelInfo == NULL)
|
||||
{
|
||||
resultRelInfo = ExecInitPartitionInfo(NULL,
|
||||
resultRelInfo = ExecInitPartitionInfo(mtstate,
|
||||
saved_resultRelInfo,
|
||||
proute, estate,
|
||||
leaf_part_index);
|
||||
Assert(resultRelInfo != NULL);
|
||||
}
|
||||
|
||||
/* We do not yet have a way to insert into a foreign partition */
|
||||
if (resultRelInfo->ri_FdwRoutine)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot route inserted tuples to a foreign table")));
|
||||
|
||||
/*
|
||||
* For ExecInsertIndexTuples() to work on the partition's indexes
|
||||
*/
|
||||
@@ -2726,9 +2737,13 @@ CopyFrom(CopyState cstate)
|
||||
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
|
||||
check_partition_constr = false;
|
||||
|
||||
/* Check the constraints of the tuple */
|
||||
if (resultRelInfo->ri_RelationDesc->rd_att->constr ||
|
||||
check_partition_constr)
|
||||
/*
|
||||
* If the target is a plain table, check the constraints of
|
||||
* the tuple.
|
||||
*/
|
||||
if (resultRelInfo->ri_FdwRoutine == NULL &&
|
||||
(resultRelInfo->ri_RelationDesc->rd_att->constr ||
|
||||
check_partition_constr))
|
||||
ExecConstraints(resultRelInfo, slot, estate, true);
|
||||
|
||||
if (useHeapMultiInsert)
|
||||
@@ -2760,10 +2775,32 @@ CopyFrom(CopyState cstate)
|
||||
{
|
||||
List *recheckIndexes = NIL;
|
||||
|
||||
/* OK, store the tuple and create index entries for it */
|
||||
heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
|
||||
hi_options, bistate);
|
||||
/* OK, store the tuple */
|
||||
if (resultRelInfo->ri_FdwRoutine != NULL)
|
||||
{
|
||||
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
|
||||
resultRelInfo,
|
||||
slot,
|
||||
NULL);
|
||||
|
||||
if (slot == NULL) /* "do nothing" */
|
||||
goto next_tuple;
|
||||
|
||||
/* FDW might have changed tuple */
|
||||
tuple = ExecMaterializeSlot(slot);
|
||||
|
||||
/*
|
||||
* AFTER ROW Triggers might reference the tableoid
|
||||
* column, so initialize t_tableOid before evaluating
|
||||
* them.
|
||||
*/
|
||||
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
||||
}
|
||||
else
|
||||
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
|
||||
mycid, hi_options, bistate);
|
||||
|
||||
/* And create index entries for it */
|
||||
if (resultRelInfo->ri_NumIndices > 0)
|
||||
recheckIndexes = ExecInsertIndexTuples(slot,
|
||||
&(tuple->t_self),
|
||||
@@ -2781,13 +2818,14 @@ CopyFrom(CopyState cstate)
|
||||
}
|
||||
|
||||
/*
|
||||
* We count only tuples not suppressed by a BEFORE INSERT trigger;
|
||||
* this is the same definition used by execMain.c for counting
|
||||
* tuples inserted by an INSERT command.
|
||||
* We count only tuples not suppressed by a BEFORE INSERT trigger
|
||||
* or FDW; this is the same definition used by nodeModifyTable.c
|
||||
* for counting tuples inserted by an INSERT command.
|
||||
*/
|
||||
processed++;
|
||||
}
|
||||
|
||||
next_tuple:
|
||||
/* Restore the saved ResultRelInfo */
|
||||
if (saved_resultRelInfo)
|
||||
{
|
||||
@@ -2828,11 +2866,17 @@ CopyFrom(CopyState cstate)
|
||||
|
||||
ExecResetTupleTable(estate->es_tupleTable, false);
|
||||
|
||||
/* Allow the FDW to shut down */
|
||||
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
||||
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
||||
resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
|
||||
resultRelInfo);
|
||||
|
||||
ExecCloseIndices(resultRelInfo);
|
||||
|
||||
/* Close all the partitioned tables, leaf partitions, and their indices */
|
||||
if (cstate->partition_tuple_routing)
|
||||
ExecCleanupTupleRouting(cstate->partition_tuple_routing);
|
||||
ExecCleanupTupleRouting(mtstate, cstate->partition_tuple_routing);
|
||||
|
||||
/* Close any trigger target relations */
|
||||
ExecCleanUpTriggerState(estate);
|
||||
|
||||
@@ -1179,13 +1179,6 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
|
||||
switch (operation)
|
||||
{
|
||||
case CMD_INSERT:
|
||||
|
||||
/*
|
||||
* If foreign partition to do tuple-routing for, skip the
|
||||
* check; it's disallowed elsewhere.
|
||||
*/
|
||||
if (resultRelInfo->ri_PartitionRoot)
|
||||
break;
|
||||
if (fdwroutine->ExecForeignInsert == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
@@ -1378,6 +1371,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
|
||||
|
||||
resultRelInfo->ri_PartitionCheck = partition_check;
|
||||
resultRelInfo->ri_PartitionRoot = partition_root;
|
||||
resultRelInfo->ri_PartitionReadyForRouting = false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "catalog/pg_type.h"
|
||||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
@@ -55,12 +56,13 @@ static List *adjust_partition_tlist(List *tlist, TupleConversionMap *map);
|
||||
* see ExecInitPartitionInfo. However, if the function is invoked for update
|
||||
* tuple routing, caller would already have initialized ResultRelInfo's for
|
||||
* some of the partitions, which are reused and assigned to their respective
|
||||
* slot in the aforementioned array.
|
||||
* slot in the aforementioned array. For such partitions, we delay setting
|
||||
* up objects such as TupleConversionMap until those are actually chosen as
|
||||
* the partitions to route tuples to. See ExecPrepareTupleRouting.
|
||||
*/
|
||||
PartitionTupleRouting *
|
||||
ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||
{
|
||||
TupleDesc tupDesc = RelationGetDescr(rel);
|
||||
List *leaf_parts;
|
||||
ListCell *cell;
|
||||
int i;
|
||||
@@ -141,11 +143,7 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||
if (update_rri_index < num_update_rri &&
|
||||
RelationGetRelid(update_rri[update_rri_index].ri_RelationDesc) == leaf_oid)
|
||||
{
|
||||
Relation partrel;
|
||||
TupleDesc part_tupdesc;
|
||||
|
||||
leaf_part_rri = &update_rri[update_rri_index];
|
||||
partrel = leaf_part_rri->ri_RelationDesc;
|
||||
|
||||
/*
|
||||
* This is required in order to convert the partition's tuple to
|
||||
@@ -159,23 +157,6 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||
proute->subplan_partition_offsets[update_rri_index] = i;
|
||||
|
||||
update_rri_index++;
|
||||
|
||||
part_tupdesc = RelationGetDescr(partrel);
|
||||
|
||||
/*
|
||||
* Save a tuple conversion map to convert a tuple routed to this
|
||||
* partition from the parent's type to the partition's.
|
||||
*/
|
||||
proute->parent_child_tupconv_maps[i] =
|
||||
convert_tuples_by_name(tupDesc, part_tupdesc,
|
||||
gettext_noop("could not convert row type"));
|
||||
|
||||
/*
|
||||
* Verify result relation is a valid target for an INSERT. An
|
||||
* UPDATE of a partition-key becomes a DELETE+INSERT operation, so
|
||||
* this check is required even when the operation is CMD_UPDATE.
|
||||
*/
|
||||
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
|
||||
}
|
||||
|
||||
proute->partitions[i] = leaf_part_rri;
|
||||
@@ -347,10 +328,10 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
PartitionTupleRouting *proute,
|
||||
EState *estate, int partidx)
|
||||
{
|
||||
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
|
||||
Relation rootrel = resultRelInfo->ri_RelationDesc,
|
||||
partrel;
|
||||
ResultRelInfo *leaf_part_rri;
|
||||
ModifyTable *node = mtstate ? (ModifyTable *) mtstate->ps.plan : NULL;
|
||||
MemoryContext oldContext;
|
||||
|
||||
/*
|
||||
@@ -374,13 +355,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
|
||||
leaf_part_rri->ri_PartitionLeafIndex = partidx;
|
||||
|
||||
/*
|
||||
* Verify result relation is a valid target for an INSERT. An UPDATE of a
|
||||
* partition-key becomes a DELETE+INSERT operation, so this check is still
|
||||
* required when the operation is CMD_UPDATE.
|
||||
*/
|
||||
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
|
||||
|
||||
/*
|
||||
* Since we've just initialized this ResultRelInfo, it's not in any list
|
||||
* attached to the estate as yet. Add it, so that it can be found later.
|
||||
@@ -393,6 +367,9 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
lappend(estate->es_tuple_routing_result_relations,
|
||||
leaf_part_rri);
|
||||
|
||||
/* Set up information needed for routing tuples to this partition. */
|
||||
ExecInitRoutingInfo(mtstate, estate, proute, leaf_part_rri, partidx);
|
||||
|
||||
/*
|
||||
* Open partition indices. The user may have asked to check for conflicts
|
||||
* within this leaf partition and do "nothing" instead of throwing an
|
||||
@@ -498,6 +475,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
returningList = map_partition_varattnos(returningList, firstVarno,
|
||||
partrel, firstResultRel,
|
||||
NULL);
|
||||
leaf_part_rri->ri_returningList = returningList;
|
||||
|
||||
/*
|
||||
* Initialize the projection itself.
|
||||
@@ -514,15 +492,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
&mtstate->ps, RelationGetDescr(partrel));
|
||||
}
|
||||
|
||||
/*
|
||||
* Save a tuple conversion map to convert a tuple routed to this partition
|
||||
* from the parent's type to the partition's.
|
||||
*/
|
||||
proute->parent_child_tupconv_maps[partidx] =
|
||||
convert_tuples_by_name(RelationGetDescr(rootrel),
|
||||
RelationGetDescr(partrel),
|
||||
gettext_noop("could not convert row type"));
|
||||
|
||||
/*
|
||||
* If there is an ON CONFLICT clause, initialize state for it.
|
||||
*/
|
||||
@@ -751,6 +720,50 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
||||
return leaf_part_rri;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecInitRoutingInfo
|
||||
* Set up information needed for routing tuples to a leaf partition if
|
||||
* routable; else abort the operation
|
||||
*/
|
||||
void
|
||||
ExecInitRoutingInfo(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
PartitionTupleRouting *proute,
|
||||
ResultRelInfo *partRelInfo,
|
||||
int partidx)
|
||||
{
|
||||
MemoryContext oldContext;
|
||||
|
||||
/* Verify the partition is a valid target for INSERT */
|
||||
CheckValidResultRel(partRelInfo, CMD_INSERT);
|
||||
|
||||
/*
|
||||
* Switch into per-query memory context.
|
||||
*/
|
||||
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
|
||||
|
||||
/*
|
||||
* Set up a tuple conversion map to convert a tuple routed to the
|
||||
* partition from the parent's type to the partition's.
|
||||
*/
|
||||
proute->parent_child_tupconv_maps[partidx] =
|
||||
convert_tuples_by_name(RelationGetDescr(partRelInfo->ri_PartitionRoot),
|
||||
RelationGetDescr(partRelInfo->ri_RelationDesc),
|
||||
gettext_noop("could not convert row type"));
|
||||
|
||||
/*
|
||||
* If the partition is a foreign table, let the FDW init itself for
|
||||
* routing tuples to the partition.
|
||||
*/
|
||||
if (partRelInfo->ri_FdwRoutine != NULL &&
|
||||
partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
||||
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
partRelInfo->ri_PartitionReadyForRouting = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecSetupChildParentMapForLeaf -- Initialize the per-leaf-partition
|
||||
* child-to-root tuple conversion map array.
|
||||
@@ -853,7 +866,8 @@ ConvertPartitionTupleSlot(TupleConversionMap *map,
|
||||
* Close all the partitioned tables, leaf partitions, and their indices.
|
||||
*/
|
||||
void
|
||||
ExecCleanupTupleRouting(PartitionTupleRouting *proute)
|
||||
ExecCleanupTupleRouting(ModifyTableState *mtstate,
|
||||
PartitionTupleRouting *proute)
|
||||
{
|
||||
int i;
|
||||
int subplan_index = 0;
|
||||
@@ -881,6 +895,13 @@ ExecCleanupTupleRouting(PartitionTupleRouting *proute)
|
||||
if (resultRelInfo == NULL)
|
||||
continue;
|
||||
|
||||
/* Allow any FDWs to shut down if they've been exercised */
|
||||
if (resultRelInfo->ri_PartitionReadyForRouting &&
|
||||
resultRelInfo->ri_FdwRoutine != NULL &&
|
||||
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
||||
resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
|
||||
resultRelInfo);
|
||||
|
||||
/*
|
||||
* If this result rel is one of the UPDATE subplan result rels, let
|
||||
* ExecEndPlan() close it. For INSERT or COPY,
|
||||
|
||||
@@ -1826,11 +1826,21 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
||||
proute, estate,
|
||||
partidx);
|
||||
|
||||
/* We do not yet have a way to insert into a foreign partition */
|
||||
if (partrel->ri_FdwRoutine)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot route inserted tuples to a foreign table")));
|
||||
/*
|
||||
* Set up information needed for routing tuples to the partition if we
|
||||
* didn't yet (ExecInitRoutingInfo would abort the operation if the
|
||||
* partition isn't routable).
|
||||
*
|
||||
* Note: an UPDATE of a partition key invokes an INSERT that moves the
|
||||
* tuple to a new partition. This setup would be needed for a subplan
|
||||
* partition of such an UPDATE that is chosen as the partition to route
|
||||
* the tuple to. The reason we do this setup here rather than in
|
||||
* ExecSetupPartitionTupleRouting is to avoid aborting such an UPDATE
|
||||
* unnecessarily due to non-routable subplan partitions that may not be
|
||||
* chosen for update tuple movement after all.
|
||||
*/
|
||||
if (!partrel->ri_PartitionReadyForRouting)
|
||||
ExecInitRoutingInfo(mtstate, estate, proute, partrel, partidx);
|
||||
|
||||
/*
|
||||
* Make it look like we are inserting into the partition.
|
||||
@@ -2531,6 +2541,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
||||
{
|
||||
List *rlist = (List *) lfirst(l);
|
||||
|
||||
resultRelInfo->ri_returningList = rlist;
|
||||
resultRelInfo->ri_projectReturning =
|
||||
ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
|
||||
resultRelInfo->ri_RelationDesc->rd_att);
|
||||
@@ -2830,7 +2841,7 @@ ExecEndModifyTable(ModifyTableState *node)
|
||||
|
||||
/* Close all the partitioned tables, leaf partitions, and their indices */
|
||||
if (node->mt_partition_tuple_routing)
|
||||
ExecCleanupTupleRouting(node->mt_partition_tuple_routing);
|
||||
ExecCleanupTupleRouting(node, node->mt_partition_tuple_routing);
|
||||
|
||||
/*
|
||||
* Free the exprcontext
|
||||
|
||||
Reference in New Issue
Block a user