1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Fix transition tables for partition/inheritance.

We disallow row-level triggers with transition tables on child tables.
Transition tables for triggers on the parent table contain only those
columns present in the parent.  (We can't mix tuple formats in a
single transition table.)

Patch by Thomas Munro

Discussion: https://postgr.es/m/CA%2BTgmoZzTBBAsEUh4MazAN7ga%3D8SsMC-Knp-6cetts9yNZUCcg%40mail.gmail.com
This commit is contained in:
Andrew Gierth
2017-06-28 18:55:03 +01:00
parent 99255d73c0
commit 501ed02cf6
13 changed files with 1143 additions and 110 deletions

View File

@ -24,6 +24,7 @@
#include "catalog/objectaccess.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_constraint_fn.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
@ -96,7 +97,8 @@ static HeapTuple ExecCallTriggerFunc(TriggerData *trigdata,
static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup,
List *recheckIndexes, Bitmapset *modifiedCols);
List *recheckIndexes, Bitmapset *modifiedCols,
TransitionCaptureState *transition_capture);
static void AfterTriggerEnlargeQueryState(void);
@ -354,13 +356,6 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
* adjustments will be needed below.
*/
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a partitioned table",
RelationGetRelationName(rel)),
errdetail("Triggers on partitioned tables cannot have transition tables.")));
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@ -375,6 +370,27 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
RelationGetRelationName(rel)),
errdetail("Triggers on views cannot have transition tables.")));
/*
* We currently don't allow row-level triggers with transition
* tables on partition or inheritance children. Such triggers
* would somehow need to see tuples converted to the format of the
* table they're attached to, and it's not clear which subset of
* tuples each child should see. See also the prohibitions in
* ATExecAttachPartition() and ATExecAddInherit().
*/
if (TRIGGER_FOR_ROW(tgtype) && has_superclass(rel->rd_id))
{
/* Use appropriate error message. */
if (rel->rd_rel->relispartition)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ROW triggers with transition tables are not supported on partitions")));
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ROW triggers with transition tables are not supported on inheritance children")));
}
if (stmt->timing != TRIGGER_TYPE_AFTER)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@ -2028,6 +2044,64 @@ equalTriggerDescs(TriggerDesc *trigdesc1, TriggerDesc *trigdesc2)
}
#endif /* NOT_USED */
/*
* Check if there is a row-level trigger with transition tables that prevents
* a table from becoming an inheritance child or partition. Return the name
* of the first such incompatible trigger, or NULL if there is none.
*/
const char *
FindTriggerIncompatibleWithInheritance(TriggerDesc *trigdesc)
{
if (trigdesc != NULL)
{
int i;
for (i = 0; i < trigdesc->numtriggers; ++i)
{
Trigger *trigger = &trigdesc->triggers[i];
if (trigger->tgoldtable != NULL || trigger->tgnewtable != NULL)
return trigger->tgname;
}
}
return NULL;
}
/*
* Make a TransitionCaptureState object from a given TriggerDesc. The
* resulting object holds the flags which control whether transition tuples
* are collected when tables are modified. This allows us to use the flags
* from a parent table to control the collection of transition tuples from
* child tables.
*
* If there are no triggers with transition tables configured for 'trigdesc',
* then return NULL.
*
* The resulting object can be passed to the ExecAR* functions. The caller
* should set tcs_map or tcs_original_insert_tuple as appropriate when dealing
* with child tables.
*/
TransitionCaptureState *
MakeTransitionCaptureState(TriggerDesc *trigdesc)
{
TransitionCaptureState *state = NULL;
if (trigdesc != NULL &&
(trigdesc->trig_delete_old_table || trigdesc->trig_update_old_table ||
trigdesc->trig_update_new_table || trigdesc->trig_insert_new_table))
{
state = (TransitionCaptureState *)
palloc0(sizeof(TransitionCaptureState));
state->tcs_delete_old_table = trigdesc->trig_delete_old_table;
state->tcs_update_old_table = trigdesc->trig_update_old_table;
state->tcs_update_new_table = trigdesc->trig_update_new_table;
state->tcs_insert_new_table = trigdesc->trig_insert_new_table;
}
return state;
}
/*
* Call a trigger function.
*
@ -2192,7 +2266,7 @@ ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo)
if (trigdesc && trigdesc->trig_insert_after_statement)
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
false, NULL, NULL, NIL, NULL);
false, NULL, NULL, NIL, NULL, NULL);
}
TupleTableSlot *
@ -2263,14 +2337,18 @@ ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo,
void
ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo,
HeapTuple trigtuple, List *recheckIndexes)
HeapTuple trigtuple, List *recheckIndexes,
TransitionCaptureState *transition_capture)
{
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc &&
(trigdesc->trig_insert_after_row || trigdesc->trig_insert_new_table))
if ((trigdesc && trigdesc->trig_insert_after_row) ||
(trigdesc && !transition_capture && trigdesc->trig_insert_new_table) ||
(transition_capture && transition_capture->tcs_insert_new_table))
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
true, NULL, trigtuple, recheckIndexes, NULL);
true, NULL, trigtuple,
recheckIndexes, NULL,
transition_capture);
}
TupleTableSlot *
@ -2398,7 +2476,7 @@ ExecASDeleteTriggers(EState *estate, ResultRelInfo *relinfo)
if (trigdesc && trigdesc->trig_delete_after_statement)
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
false, NULL, NULL, NIL, NULL);
false, NULL, NULL, NIL, NULL, NULL);
}
bool
@ -2473,12 +2551,14 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
void
ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
ItemPointer tupleid,
HeapTuple fdw_trigtuple)
HeapTuple fdw_trigtuple,
TransitionCaptureState *transition_capture)
{
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc &&
(trigdesc->trig_delete_after_row || trigdesc->trig_delete_old_table))
if ((trigdesc && trigdesc->trig_delete_after_row) ||
(trigdesc && !transition_capture && trigdesc->trig_delete_old_table) ||
(transition_capture && transition_capture->tcs_delete_old_table))
{
HeapTuple trigtuple;
@ -2494,7 +2574,8 @@ ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
trigtuple = fdw_trigtuple;
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
true, trigtuple, NULL, NIL, NULL);
true, trigtuple, NULL, NIL, NULL,
transition_capture);
if (trigtuple != fdw_trigtuple)
heap_freetuple(trigtuple);
}
@ -2610,7 +2691,8 @@ ExecASUpdateTriggers(EState *estate, ResultRelInfo *relinfo)
if (trigdesc && trigdesc->trig_update_after_statement)
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
false, NULL, NULL, NIL,
GetUpdatedColumns(relinfo, estate));
GetUpdatedColumns(relinfo, estate),
NULL);
}
TupleTableSlot *
@ -2735,12 +2817,18 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
ItemPointer tupleid,
HeapTuple fdw_trigtuple,
HeapTuple newtuple,
List *recheckIndexes)
List *recheckIndexes,
TransitionCaptureState *transition_capture)
{
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && (trigdesc->trig_update_after_row ||
trigdesc->trig_update_old_table || trigdesc->trig_update_new_table))
if ((trigdesc && trigdesc->trig_update_after_row) ||
(trigdesc && !transition_capture &&
(trigdesc->trig_update_old_table ||
trigdesc->trig_update_new_table)) ||
(transition_capture &&
(transition_capture->tcs_update_old_table ||
transition_capture->tcs_update_new_table)))
{
HeapTuple trigtuple;
@ -2757,7 +2845,8 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
true, trigtuple, newtuple, recheckIndexes,
GetUpdatedColumns(relinfo, estate));
GetUpdatedColumns(relinfo, estate),
transition_capture);
if (trigtuple != fdw_trigtuple)
heap_freetuple(trigtuple);
}
@ -2888,7 +2977,7 @@ ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo)
if (trigdesc && trigdesc->trig_truncate_after_statement)
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_TRUNCATE,
false, NULL, NULL, NIL, NULL);
false, NULL, NULL, NIL, NULL, NULL);
}
@ -5090,7 +5179,8 @@ static void
AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup,
List *recheckIndexes, Bitmapset *modifiedCols)
List *recheckIndexes, Bitmapset *modifiedCols,
TransitionCaptureState *transition_capture)
{
Relation rel = relinfo->ri_RelationDesc;
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
@ -5120,10 +5210,49 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
*/
if (row_trigger)
{
if ((event == TRIGGER_EVENT_DELETE &&
trigdesc->trig_delete_old_table) ||
(event == TRIGGER_EVENT_UPDATE &&
trigdesc->trig_update_old_table))
HeapTuple original_insert_tuple = NULL;
TupleConversionMap *map = NULL;
bool delete_old_table = false;
bool update_old_table = false;
bool update_new_table = false;
bool insert_new_table = false;
if (transition_capture != NULL)
{
/*
* A TransitionCaptureState object was provided to tell us which
* tuples to capture based on a parent table named in a DML
* statement. We may be dealing with a child table with an
* incompatible TupleDescriptor, in which case we'll need a map to
* convert them. As a small optimization, we may receive the
* original tuple from an insertion into a partitioned table to
* avoid a wasteful parent->child->parent round trip.
*/
delete_old_table = transition_capture->tcs_delete_old_table;
update_old_table = transition_capture->tcs_update_old_table;
update_new_table = transition_capture->tcs_update_new_table;
insert_new_table = transition_capture->tcs_insert_new_table;
map = transition_capture->tcs_map;
original_insert_tuple =
transition_capture->tcs_original_insert_tuple;
}
else if (trigdesc != NULL)
{
/*
* Check if we need to capture transition tuples for triggers
* defined on this relation directly. This case is useful for
* cases like execReplication.c which don't set up a
* TriggerCaptureState because they don't know how to work with
* partitions.
*/
delete_old_table = trigdesc->trig_delete_old_table;
update_old_table = trigdesc->trig_update_old_table;
update_new_table = trigdesc->trig_update_new_table;
insert_new_table = trigdesc->trig_insert_new_table;
}
if ((event == TRIGGER_EVENT_DELETE && delete_old_table) ||
(event == TRIGGER_EVENT_UPDATE && update_old_table))
{
Tuplestorestate *old_tuplestore;
@ -5131,12 +5260,18 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
old_tuplestore =
GetTriggerTransitionTuplestore
(afterTriggers.old_tuplestores);
tuplestore_puttuple(old_tuplestore, oldtup);
if (map != NULL)
{
HeapTuple converted = do_convert_tuple(oldtup, map);
tuplestore_puttuple(old_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(old_tuplestore, oldtup);
}
if ((event == TRIGGER_EVENT_INSERT &&
trigdesc->trig_insert_new_table) ||
(event == TRIGGER_EVENT_UPDATE &&
trigdesc->trig_update_new_table))
if ((event == TRIGGER_EVENT_INSERT && insert_new_table) ||
(event == TRIGGER_EVENT_UPDATE && update_new_table))
{
Tuplestorestate *new_tuplestore;
@ -5144,11 +5279,22 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
new_tuplestore =
GetTriggerTransitionTuplestore
(afterTriggers.new_tuplestores);
tuplestore_puttuple(new_tuplestore, newtup);
if (original_insert_tuple != NULL)
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
else if (map != NULL)
{
HeapTuple converted = do_convert_tuple(newtup, map);
tuplestore_puttuple(new_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(new_tuplestore, newtup);
}
/* If transition tables are the only reason we're here, return. */
if ((event == TRIGGER_EVENT_DELETE && !trigdesc->trig_delete_after_row) ||
if (trigdesc == NULL ||
(event == TRIGGER_EVENT_DELETE && !trigdesc->trig_delete_after_row) ||
(event == TRIGGER_EVENT_INSERT && !trigdesc->trig_insert_after_row) ||
(event == TRIGGER_EVENT_UPDATE && !trigdesc->trig_update_after_row))
return;