mirror of
https://github.com/postgres/postgres.git
synced 2025-06-30 21:42:05 +03:00
Offer triggers on foreign tables.
This covers all the SQL-standard trigger types supported for regular tables; it does not cover constraint triggers. The approach for acquiring the old row mirrors that for view INSTEAD OF triggers. For AFTER ROW triggers, we spool the foreign tuples to a tuplestore. This changes the FDW API contract; when deciding which columns to populate in the slot returned from data modification callbacks, writable FDWs will need to check for AFTER ROW triggers in addition to checking for a RETURNING clause. In support of the feature addition, refactor the TriggerFlags bits and the assembly of old tuples in ModifyTable. Ronan Dunklau, reviewed by KaiGai Kohei; some additional hacking by me.
This commit is contained in:
@ -56,6 +56,7 @@
|
||||
#include "utils/snapmgr.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/tqual.h"
|
||||
#include "utils/tuplestore.h"
|
||||
|
||||
|
||||
/* GUC variables */
|
||||
@ -195,6 +196,30 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
|
||||
RelationGetRelationName(rel)),
|
||||
errdetail("Views cannot have TRUNCATE triggers.")));
|
||||
}
|
||||
else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
if (stmt->timing != TRIGGER_TYPE_BEFORE &&
|
||||
stmt->timing != TRIGGER_TYPE_AFTER)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a foreign table",
|
||||
RelationGetRelationName(rel)),
|
||||
errdetail("Foreign tables cannot have INSTEAD OF triggers.")));
|
||||
|
||||
if (TRIGGER_FOR_TRUNCATE(stmt->events))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a foreign table",
|
||||
RelationGetRelationName(rel)),
|
||||
errdetail("Foreign tables cannot have TRUNCATE triggers.")));
|
||||
|
||||
if (stmt->isconstraint)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a foreign table",
|
||||
RelationGetRelationName(rel)),
|
||||
errdetail("Foreign tables cannot have constraint triggers.")));
|
||||
}
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
@ -1080,10 +1105,11 @@ RemoveTriggerById(Oid trigOid)
|
||||
rel = heap_open(relid, AccessExclusiveLock);
|
||||
|
||||
if (rel->rd_rel->relkind != RELKIND_RELATION &&
|
||||
rel->rd_rel->relkind != RELKIND_VIEW)
|
||||
rel->rd_rel->relkind != RELKIND_VIEW &&
|
||||
rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is not a table or view",
|
||||
errmsg("\"%s\" is not a table, view, or foreign table",
|
||||
RelationGetRelationName(rel))));
|
||||
|
||||
if (!allowSystemTableMods && IsSystemRelation(rel))
|
||||
@ -1184,10 +1210,12 @@ RangeVarCallbackForRenameTrigger(const RangeVar *rv, Oid relid, Oid oldrelid,
|
||||
form = (Form_pg_class) GETSTRUCT(tuple);
|
||||
|
||||
/* only tables and views can have triggers */
|
||||
if (form->relkind != RELKIND_RELATION && form->relkind != RELKIND_VIEW)
|
||||
if (form->relkind != RELKIND_RELATION && form->relkind != RELKIND_VIEW &&
|
||||
form->relkind != RELKIND_FOREIGN_TABLE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is not a table or view", rv->relname)));
|
||||
errmsg("\"%s\" is not a table, view, or foreign table",
|
||||
rv->relname)));
|
||||
|
||||
/* you must own the table to rename one of its triggers */
|
||||
if (!pg_class_ownercheck(relid, GetUserId()))
|
||||
@ -2164,7 +2192,8 @@ ExecASDeleteTriggers(EState *estate, ResultRelInfo *relinfo)
|
||||
bool
|
||||
ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
|
||||
ResultRelInfo *relinfo,
|
||||
ItemPointer tupleid)
|
||||
ItemPointer tupleid,
|
||||
HeapTuple fdw_trigtuple)
|
||||
{
|
||||
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
|
||||
bool result = true;
|
||||
@ -2174,10 +2203,16 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
|
||||
TupleTableSlot *newSlot;
|
||||
int i;
|
||||
|
||||
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
|
||||
LockTupleExclusive, &newSlot);
|
||||
if (trigtuple == NULL)
|
||||
return false;
|
||||
Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
|
||||
if (fdw_trigtuple == NULL)
|
||||
{
|
||||
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
|
||||
LockTupleExclusive, &newSlot);
|
||||
if (trigtuple == NULL)
|
||||
return false;
|
||||
}
|
||||
else
|
||||
trigtuple = fdw_trigtuple;
|
||||
|
||||
LocTriggerData.type = T_TriggerData;
|
||||
LocTriggerData.tg_event = TRIGGER_EVENT_DELETE |
|
||||
@ -2215,29 +2250,38 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
|
||||
if (newtuple != trigtuple)
|
||||
heap_freetuple(newtuple);
|
||||
}
|
||||
heap_freetuple(trigtuple);
|
||||
if (trigtuple != fdw_trigtuple)
|
||||
heap_freetuple(trigtuple);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void
|
||||
ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
|
||||
ItemPointer tupleid)
|
||||
ItemPointer tupleid,
|
||||
HeapTuple fdw_trigtuple)
|
||||
{
|
||||
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
|
||||
|
||||
if (trigdesc && trigdesc->trig_delete_after_row)
|
||||
{
|
||||
HeapTuple trigtuple = GetTupleForTrigger(estate,
|
||||
NULL,
|
||||
relinfo,
|
||||
tupleid,
|
||||
LockTupleExclusive,
|
||||
NULL);
|
||||
HeapTuple trigtuple;
|
||||
|
||||
Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
|
||||
if (fdw_trigtuple == NULL)
|
||||
trigtuple = GetTupleForTrigger(estate,
|
||||
NULL,
|
||||
relinfo,
|
||||
tupleid,
|
||||
LockTupleExclusive,
|
||||
NULL);
|
||||
else
|
||||
trigtuple = fdw_trigtuple;
|
||||
|
||||
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
|
||||
true, trigtuple, NULL, NIL, NULL);
|
||||
heap_freetuple(trigtuple);
|
||||
if (trigtuple != fdw_trigtuple)
|
||||
heap_freetuple(trigtuple);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2353,7 +2397,9 @@ ExecASUpdateTriggers(EState *estate, ResultRelInfo *relinfo)
|
||||
TupleTableSlot *
|
||||
ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
|
||||
ResultRelInfo *relinfo,
|
||||
ItemPointer tupleid, TupleTableSlot *slot)
|
||||
ItemPointer tupleid,
|
||||
HeapTuple fdw_trigtuple,
|
||||
TupleTableSlot *slot)
|
||||
{
|
||||
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
|
||||
HeapTuple slottuple = ExecMaterializeSlot(slot);
|
||||
@ -2380,11 +2426,20 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
|
||||
else
|
||||
lockmode = LockTupleNoKeyExclusive;
|
||||
|
||||
/* get a copy of the on-disk tuple we are planning to update */
|
||||
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
|
||||
lockmode, &newSlot);
|
||||
if (trigtuple == NULL)
|
||||
return NULL; /* cancel the update action */
|
||||
Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
|
||||
if (fdw_trigtuple == NULL)
|
||||
{
|
||||
/* get a copy of the on-disk tuple we are planning to update */
|
||||
trigtuple = GetTupleForTrigger(estate, epqstate, relinfo, tupleid,
|
||||
lockmode, &newSlot);
|
||||
if (trigtuple == NULL)
|
||||
return NULL; /* cancel the update action */
|
||||
}
|
||||
else
|
||||
{
|
||||
trigtuple = fdw_trigtuple;
|
||||
newSlot = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* In READ COMMITTED isolation level it's possible that target tuple was
|
||||
@ -2437,11 +2492,13 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
|
||||
heap_freetuple(oldtuple);
|
||||
if (newtuple == NULL)
|
||||
{
|
||||
heap_freetuple(trigtuple);
|
||||
if (trigtuple != fdw_trigtuple)
|
||||
heap_freetuple(trigtuple);
|
||||
return NULL; /* "do nothing" */
|
||||
}
|
||||
}
|
||||
heap_freetuple(trigtuple);
|
||||
if (trigtuple != fdw_trigtuple)
|
||||
heap_freetuple(trigtuple);
|
||||
|
||||
if (newtuple != slottuple)
|
||||
{
|
||||
@ -2464,24 +2521,33 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
|
||||
|
||||
void
|
||||
ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
|
||||
ItemPointer tupleid, HeapTuple newtuple,
|
||||
ItemPointer tupleid,
|
||||
HeapTuple fdw_trigtuple,
|
||||
HeapTuple newtuple,
|
||||
List *recheckIndexes)
|
||||
{
|
||||
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
|
||||
|
||||
if (trigdesc && trigdesc->trig_update_after_row)
|
||||
{
|
||||
HeapTuple trigtuple = GetTupleForTrigger(estate,
|
||||
NULL,
|
||||
relinfo,
|
||||
tupleid,
|
||||
LockTupleExclusive,
|
||||
NULL);
|
||||
HeapTuple trigtuple;
|
||||
|
||||
Assert(HeapTupleIsValid(fdw_trigtuple) ^ ItemPointerIsValid(tupleid));
|
||||
if (fdw_trigtuple == NULL)
|
||||
trigtuple = GetTupleForTrigger(estate,
|
||||
NULL,
|
||||
relinfo,
|
||||
tupleid,
|
||||
LockTupleExclusive,
|
||||
NULL);
|
||||
else
|
||||
trigtuple = fdw_trigtuple;
|
||||
|
||||
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
|
||||
true, trigtuple, newtuple, recheckIndexes,
|
||||
GetModifiedColumns(relinfo, estate));
|
||||
heap_freetuple(trigtuple);
|
||||
if (trigtuple != fdw_trigtuple)
|
||||
heap_freetuple(trigtuple);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2942,13 +3008,22 @@ typedef SetConstraintStateData *SetConstraintState;
|
||||
* Per-trigger-event data
|
||||
*
|
||||
* The actual per-event data, AfterTriggerEventData, includes DONE/IN_PROGRESS
|
||||
* status bits and one or two tuple CTIDs. Each event record also has an
|
||||
* associated AfterTriggerSharedData that is shared across all instances
|
||||
* of similar events within a "chunk".
|
||||
* status bits and up to two tuple CTIDs. Each event record also has an
|
||||
* associated AfterTriggerSharedData that is shared across all instances of
|
||||
* similar events within a "chunk".
|
||||
*
|
||||
* We arrange not to waste storage on ate_ctid2 for non-update events.
|
||||
* We could go further and not store either ctid for statement-level triggers,
|
||||
* but that seems unlikely to be worth the trouble.
|
||||
* For row-level triggers, we arrange not to waste storage on unneeded ctid
|
||||
* fields. Updates of regular tables use two; inserts and deletes of regular
|
||||
* tables use one; foreign tables always use zero and save the tuple(s) to a
|
||||
* tuplestore. AFTER_TRIGGER_FDW_FETCH directs AfterTriggerExecute() to
|
||||
* retrieve a fresh tuple or pair of tuples from that tuplestore, while
|
||||
* AFTER_TRIGGER_FDW_REUSE directs it to use the most-recently-retrieved
|
||||
* tuple(s). This permits storing tuples once regardless of the number of
|
||||
* row-level triggers on a foreign table.
|
||||
*
|
||||
* Statement-level triggers always bear AFTER_TRIGGER_1CTID, though they
|
||||
* require no ctid field. We lack the flag bit space to neatly represent that
|
||||
* distinct case, and it seems unlikely to be worth much trouble.
|
||||
*
|
||||
* Note: ats_firing_id is initially zero and is set to something else when
|
||||
* AFTER_TRIGGER_IN_PROGRESS is set. It indicates which trigger firing
|
||||
@ -2963,9 +3038,14 @@ typedef uint32 TriggerFlags;
|
||||
|
||||
#define AFTER_TRIGGER_OFFSET 0x0FFFFFFF /* must be low-order
|
||||
* bits */
|
||||
#define AFTER_TRIGGER_2CTIDS 0x10000000
|
||||
#define AFTER_TRIGGER_DONE 0x20000000
|
||||
#define AFTER_TRIGGER_IN_PROGRESS 0x40000000
|
||||
#define AFTER_TRIGGER_DONE 0x10000000
|
||||
#define AFTER_TRIGGER_IN_PROGRESS 0x20000000
|
||||
/* bits describing the size and tuple sources of this event */
|
||||
#define AFTER_TRIGGER_FDW_REUSE 0x00000000
|
||||
#define AFTER_TRIGGER_FDW_FETCH 0x80000000
|
||||
#define AFTER_TRIGGER_1CTID 0x40000000
|
||||
#define AFTER_TRIGGER_2CTID 0xC0000000
|
||||
#define AFTER_TRIGGER_TUP_BITS 0xC0000000
|
||||
|
||||
typedef struct AfterTriggerSharedData *AfterTriggerShared;
|
||||
|
||||
@ -2986,16 +3066,25 @@ typedef struct AfterTriggerEventData
|
||||
ItemPointerData ate_ctid2; /* new updated tuple */
|
||||
} AfterTriggerEventData;
|
||||
|
||||
/* This struct must exactly match the one above except for not having ctid2 */
|
||||
/* AfterTriggerEventData, minus ate_ctid2 */
|
||||
typedef struct AfterTriggerEventDataOneCtid
|
||||
{
|
||||
TriggerFlags ate_flags; /* status bits and offset to shared data */
|
||||
ItemPointerData ate_ctid1; /* inserted, deleted, or old updated tuple */
|
||||
} AfterTriggerEventDataOneCtid;
|
||||
|
||||
/* AfterTriggerEventData, minus ate_ctid1 and ate_ctid2 */
|
||||
typedef struct AfterTriggerEventDataZeroCtids
|
||||
{
|
||||
TriggerFlags ate_flags; /* status bits and offset to shared data */
|
||||
} AfterTriggerEventDataZeroCtids;
|
||||
|
||||
#define SizeofTriggerEvent(evt) \
|
||||
(((evt)->ate_flags & AFTER_TRIGGER_2CTIDS) ? \
|
||||
sizeof(AfterTriggerEventData) : sizeof(AfterTriggerEventDataOneCtid))
|
||||
(((evt)->ate_flags & AFTER_TRIGGER_TUP_BITS) == AFTER_TRIGGER_2CTID ? \
|
||||
sizeof(AfterTriggerEventData) : \
|
||||
((evt)->ate_flags & AFTER_TRIGGER_TUP_BITS) == AFTER_TRIGGER_1CTID ? \
|
||||
sizeof(AfterTriggerEventDataOneCtid) : \
|
||||
sizeof(AfterTriggerEventDataZeroCtids))
|
||||
|
||||
#define GetTriggerSharedData(evt) \
|
||||
((AfterTriggerShared) ((char *) (evt) + ((evt)->ate_flags & AFTER_TRIGGER_OFFSET)))
|
||||
@ -3068,7 +3157,11 @@ typedef struct AfterTriggerEventList
|
||||
* immediate-mode triggers, and append any deferred events to the main events
|
||||
* list.
|
||||
*
|
||||
* maxquerydepth is just the allocated length of query_stack.
|
||||
* fdw_tuplestores[query_depth] is a tuplestore containing the foreign tuples
|
||||
* needed for the current query.
|
||||
*
|
||||
* maxquerydepth is just the allocated length of query_stack and
|
||||
* fdw_tuplestores.
|
||||
*
|
||||
* state_stack is a stack of pointers to saved copies of the SET CONSTRAINTS
|
||||
* state data; each subtransaction level that modifies that state first
|
||||
@ -3097,6 +3190,7 @@ typedef struct AfterTriggersData
|
||||
AfterTriggerEventList events; /* deferred-event list */
|
||||
int query_depth; /* current query list index */
|
||||
AfterTriggerEventList *query_stack; /* events pending from each query */
|
||||
Tuplestorestate **fdw_tuplestores; /* foreign tuples from each query */
|
||||
int maxquerydepth; /* allocated len of above array */
|
||||
MemoryContext event_cxt; /* memory context for events, if any */
|
||||
|
||||
@ -3113,18 +3207,60 @@ typedef AfterTriggersData *AfterTriggers;
|
||||
|
||||
static AfterTriggers afterTriggers;
|
||||
|
||||
|
||||
static void AfterTriggerExecute(AfterTriggerEvent event,
|
||||
Relation rel, TriggerDesc *trigdesc,
|
||||
FmgrInfo *finfo,
|
||||
Instrumentation *instr,
|
||||
MemoryContext per_tuple_context);
|
||||
MemoryContext per_tuple_context,
|
||||
TupleTableSlot *trig_tuple_slot1,
|
||||
TupleTableSlot *trig_tuple_slot2);
|
||||
static SetConstraintState SetConstraintStateCreate(int numalloc);
|
||||
static SetConstraintState SetConstraintStateCopy(SetConstraintState state);
|
||||
static SetConstraintState SetConstraintStateAddItem(SetConstraintState state,
|
||||
Oid tgoid, bool tgisdeferred);
|
||||
|
||||
|
||||
/*
|
||||
* Gets the current query fdw tuplestore and initializes it if necessary
|
||||
*/
|
||||
static Tuplestorestate *
|
||||
GetCurrentFDWTuplestore()
|
||||
{
|
||||
Tuplestorestate *ret;
|
||||
|
||||
ret = afterTriggers->fdw_tuplestores[afterTriggers->query_depth];
|
||||
if (ret == NULL)
|
||||
{
|
||||
MemoryContext oldcxt;
|
||||
ResourceOwner saveResourceOwner;
|
||||
|
||||
/*
|
||||
* Make the tuplestore valid until end of transaction. This is the
|
||||
* allocation lifespan of the associated events list, but we really
|
||||
* only need it until AfterTriggerEndQuery().
|
||||
*/
|
||||
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
|
||||
saveResourceOwner = CurrentResourceOwner;
|
||||
PG_TRY();
|
||||
{
|
||||
CurrentResourceOwner = TopTransactionResourceOwner;
|
||||
ret = tuplestore_begin_heap(false, false, work_mem);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
CurrentResourceOwner = saveResourceOwner;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
CurrentResourceOwner = saveResourceOwner;
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
||||
afterTriggers->fdw_tuplestores[afterTriggers->query_depth] = ret;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* ----------
|
||||
* afterTriggerCheckState()
|
||||
*
|
||||
@ -3365,13 +3501,17 @@ afterTriggerRestoreEventList(AfterTriggerEventList *events,
|
||||
* instr: array of EXPLAIN ANALYZE instrumentation nodes (one per trigger),
|
||||
* or NULL if no instrumentation is wanted.
|
||||
* per_tuple_context: memory context to call trigger function in.
|
||||
* trig_tuple_slot1: scratch slot for tg_trigtuple (foreign tables only)
|
||||
* trig_tuple_slot2: scratch slot for tg_newtuple (foreign tables only)
|
||||
* ----------
|
||||
*/
|
||||
static void
|
||||
AfterTriggerExecute(AfterTriggerEvent event,
|
||||
Relation rel, TriggerDesc *trigdesc,
|
||||
FmgrInfo *finfo, Instrumentation *instr,
|
||||
MemoryContext per_tuple_context)
|
||||
MemoryContext per_tuple_context,
|
||||
TupleTableSlot *trig_tuple_slot1,
|
||||
TupleTableSlot *trig_tuple_slot2)
|
||||
{
|
||||
AfterTriggerShared evtshared = GetTriggerSharedData(event);
|
||||
Oid tgoid = evtshared->ats_tgoid;
|
||||
@ -3408,34 +3548,76 @@ AfterTriggerExecute(AfterTriggerEvent event,
|
||||
/*
|
||||
* Fetch the required tuple(s).
|
||||
*/
|
||||
if (ItemPointerIsValid(&(event->ate_ctid1)))
|
||||
switch (event->ate_flags & AFTER_TRIGGER_TUP_BITS)
|
||||
{
|
||||
ItemPointerCopy(&(event->ate_ctid1), &(tuple1.t_self));
|
||||
if (!heap_fetch(rel, SnapshotAny, &tuple1, &buffer1, false, NULL))
|
||||
elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
|
||||
LocTriggerData.tg_trigtuple = &tuple1;
|
||||
LocTriggerData.tg_trigtuplebuf = buffer1;
|
||||
}
|
||||
else
|
||||
{
|
||||
LocTriggerData.tg_trigtuple = NULL;
|
||||
LocTriggerData.tg_trigtuplebuf = InvalidBuffer;
|
||||
}
|
||||
case AFTER_TRIGGER_FDW_FETCH:
|
||||
{
|
||||
Tuplestorestate *fdw_tuplestore = GetCurrentFDWTuplestore();
|
||||
|
||||
/* don't touch ctid2 if not there */
|
||||
if ((event->ate_flags & AFTER_TRIGGER_2CTIDS) &&
|
||||
ItemPointerIsValid(&(event->ate_ctid2)))
|
||||
{
|
||||
ItemPointerCopy(&(event->ate_ctid2), &(tuple2.t_self));
|
||||
if (!heap_fetch(rel, SnapshotAny, &tuple2, &buffer2, false, NULL))
|
||||
elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
|
||||
LocTriggerData.tg_newtuple = &tuple2;
|
||||
LocTriggerData.tg_newtuplebuf = buffer2;
|
||||
}
|
||||
else
|
||||
{
|
||||
LocTriggerData.tg_newtuple = NULL;
|
||||
LocTriggerData.tg_newtuplebuf = InvalidBuffer;
|
||||
if (!tuplestore_gettupleslot(fdw_tuplestore, true, false,
|
||||
trig_tuple_slot1))
|
||||
elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
|
||||
|
||||
if ((evtshared->ats_event & TRIGGER_EVENT_OPMASK) ==
|
||||
TRIGGER_EVENT_UPDATE &&
|
||||
!tuplestore_gettupleslot(fdw_tuplestore, true, false,
|
||||
trig_tuple_slot2))
|
||||
elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
|
||||
}
|
||||
/* fall through */
|
||||
case AFTER_TRIGGER_FDW_REUSE:
|
||||
/*
|
||||
* Using ExecMaterializeSlot() rather than ExecFetchSlotTuple()
|
||||
* ensures that tg_trigtuple does not reference tuplestore memory.
|
||||
* (It is formally possible for the trigger function to queue
|
||||
* trigger events that add to the same tuplestore, which can push
|
||||
* other tuples out of memory.) The distinction is academic,
|
||||
* because we start with a minimal tuple that ExecFetchSlotTuple()
|
||||
* must materialize anyway.
|
||||
*/
|
||||
LocTriggerData.tg_trigtuple =
|
||||
ExecMaterializeSlot(trig_tuple_slot1);
|
||||
LocTriggerData.tg_trigtuplebuf = InvalidBuffer;
|
||||
|
||||
LocTriggerData.tg_newtuple =
|
||||
((evtshared->ats_event & TRIGGER_EVENT_OPMASK) ==
|
||||
TRIGGER_EVENT_UPDATE) ?
|
||||
ExecMaterializeSlot(trig_tuple_slot2) : NULL;
|
||||
LocTriggerData.tg_newtuplebuf = InvalidBuffer;
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
if (ItemPointerIsValid(&(event->ate_ctid1)))
|
||||
{
|
||||
ItemPointerCopy(&(event->ate_ctid1), &(tuple1.t_self));
|
||||
if (!heap_fetch(rel, SnapshotAny, &tuple1, &buffer1, false, NULL))
|
||||
elog(ERROR, "failed to fetch tuple1 for AFTER trigger");
|
||||
LocTriggerData.tg_trigtuple = &tuple1;
|
||||
LocTriggerData.tg_trigtuplebuf = buffer1;
|
||||
}
|
||||
else
|
||||
{
|
||||
LocTriggerData.tg_trigtuple = NULL;
|
||||
LocTriggerData.tg_trigtuplebuf = InvalidBuffer;
|
||||
}
|
||||
|
||||
/* don't touch ctid2 if not there */
|
||||
if ((event->ate_flags & AFTER_TRIGGER_TUP_BITS) ==
|
||||
AFTER_TRIGGER_2CTID &&
|
||||
ItemPointerIsValid(&(event->ate_ctid2)))
|
||||
{
|
||||
ItemPointerCopy(&(event->ate_ctid2), &(tuple2.t_self));
|
||||
if (!heap_fetch(rel, SnapshotAny, &tuple2, &buffer2, false, NULL))
|
||||
elog(ERROR, "failed to fetch tuple2 for AFTER trigger");
|
||||
LocTriggerData.tg_newtuple = &tuple2;
|
||||
LocTriggerData.tg_newtuplebuf = buffer2;
|
||||
}
|
||||
else
|
||||
{
|
||||
LocTriggerData.tg_newtuple = NULL;
|
||||
LocTriggerData.tg_newtuplebuf = InvalidBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -3457,7 +3639,9 @@ AfterTriggerExecute(AfterTriggerEvent event,
|
||||
finfo,
|
||||
NULL,
|
||||
per_tuple_context);
|
||||
if (rettuple != NULL && rettuple != &tuple1 && rettuple != &tuple2)
|
||||
if (rettuple != NULL &&
|
||||
rettuple != LocTriggerData.tg_trigtuple &&
|
||||
rettuple != LocTriggerData.tg_newtuple)
|
||||
heap_freetuple(rettuple);
|
||||
|
||||
/*
|
||||
@ -3577,6 +3761,8 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
|
||||
TriggerDesc *trigdesc = NULL;
|
||||
FmgrInfo *finfo = NULL;
|
||||
Instrumentation *instr = NULL;
|
||||
TupleTableSlot *slot1 = NULL,
|
||||
*slot2 = NULL;
|
||||
|
||||
/* Make a local EState if need be */
|
||||
if (estate == NULL)
|
||||
@ -3621,6 +3807,16 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
|
||||
trigdesc = rInfo->ri_TrigDesc;
|
||||
finfo = rInfo->ri_TrigFunctions;
|
||||
instr = rInfo->ri_TrigInstrument;
|
||||
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
if (slot1 != NULL)
|
||||
{
|
||||
ExecDropSingleTupleTableSlot(slot1);
|
||||
ExecDropSingleTupleTableSlot(slot2);
|
||||
}
|
||||
slot1 = MakeSingleTupleTableSlot(rel->rd_att);
|
||||
slot2 = MakeSingleTupleTableSlot(rel->rd_att);
|
||||
}
|
||||
if (trigdesc == NULL) /* should not happen */
|
||||
elog(ERROR, "relation %u has no triggers",
|
||||
evtshared->ats_relid);
|
||||
@ -3632,7 +3828,7 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
|
||||
* won't try to re-fire it.
|
||||
*/
|
||||
AfterTriggerExecute(event, rel, trigdesc, finfo, instr,
|
||||
per_tuple_context);
|
||||
per_tuple_context, slot1, slot2);
|
||||
|
||||
/*
|
||||
* Mark the event as done.
|
||||
@ -3663,6 +3859,11 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events,
|
||||
events->tailfree = chunk->freeptr;
|
||||
}
|
||||
}
|
||||
if (slot1 != NULL)
|
||||
{
|
||||
ExecDropSingleTupleTableSlot(slot1);
|
||||
ExecDropSingleTupleTableSlot(slot2);
|
||||
}
|
||||
|
||||
/* Release working resources */
|
||||
MemoryContextDelete(per_tuple_context);
|
||||
@ -3712,10 +3913,13 @@ AfterTriggerBeginXact(void)
|
||||
afterTriggers->events.tailfree = NULL;
|
||||
afterTriggers->query_depth = -1;
|
||||
|
||||
/* We initialize the query stack to a reasonable size */
|
||||
/* We initialize the arrays to a reasonable size */
|
||||
afterTriggers->query_stack = (AfterTriggerEventList *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
8 * sizeof(AfterTriggerEventList));
|
||||
afterTriggers->fdw_tuplestores = (Tuplestorestate **)
|
||||
MemoryContextAllocZero(TopTransactionContext,
|
||||
8 * sizeof(Tuplestorestate *));
|
||||
afterTriggers->maxquerydepth = 8;
|
||||
|
||||
/* Context for events is created only when needed */
|
||||
@ -3756,11 +3960,18 @@ AfterTriggerBeginQuery(void)
|
||||
if (afterTriggers->query_depth >= afterTriggers->maxquerydepth)
|
||||
{
|
||||
/* repalloc will keep the stack in the same context */
|
||||
int new_alloc = afterTriggers->maxquerydepth * 2;
|
||||
int old_alloc = afterTriggers->maxquerydepth;
|
||||
int new_alloc = old_alloc * 2;
|
||||
|
||||
afterTriggers->query_stack = (AfterTriggerEventList *)
|
||||
repalloc(afterTriggers->query_stack,
|
||||
new_alloc * sizeof(AfterTriggerEventList));
|
||||
afterTriggers->fdw_tuplestores = (Tuplestorestate **)
|
||||
repalloc(afterTriggers->fdw_tuplestores,
|
||||
new_alloc * sizeof(Tuplestorestate *));
|
||||
/* Clear newly-allocated slots for subsequent lazy initialization. */
|
||||
memset(afterTriggers->fdw_tuplestores + old_alloc,
|
||||
0, (new_alloc - old_alloc) * sizeof(Tuplestorestate *));
|
||||
afterTriggers->maxquerydepth = new_alloc;
|
||||
}
|
||||
|
||||
@ -3788,6 +3999,7 @@ void
|
||||
AfterTriggerEndQuery(EState *estate)
|
||||
{
|
||||
AfterTriggerEventList *events;
|
||||
Tuplestorestate *fdw_tuplestore;
|
||||
|
||||
/* Must be inside a transaction */
|
||||
Assert(afterTriggers != NULL);
|
||||
@ -3832,7 +4044,13 @@ AfterTriggerEndQuery(EState *estate)
|
||||
break;
|
||||
}
|
||||
|
||||
/* Release query-local storage for events */
|
||||
/* Release query-local storage for events, including tuplestore if any */
|
||||
fdw_tuplestore = afterTriggers->fdw_tuplestores[afterTriggers->query_depth];
|
||||
if (fdw_tuplestore)
|
||||
{
|
||||
tuplestore_end(fdw_tuplestore);
|
||||
afterTriggers->fdw_tuplestores[afterTriggers->query_depth] = NULL;
|
||||
}
|
||||
afterTriggerFreeEventList(&afterTriggers->query_stack[afterTriggers->query_depth]);
|
||||
|
||||
afterTriggers->query_depth--;
|
||||
@ -4056,6 +4274,15 @@ AfterTriggerEndSubXact(bool isCommit)
|
||||
*/
|
||||
while (afterTriggers->query_depth > afterTriggers->depth_stack[my_level])
|
||||
{
|
||||
Tuplestorestate *ts;
|
||||
|
||||
ts = afterTriggers->fdw_tuplestores[afterTriggers->query_depth];
|
||||
if (ts)
|
||||
{
|
||||
tuplestore_end(ts);
|
||||
afterTriggers->fdw_tuplestores[afterTriggers->query_depth] = NULL;
|
||||
}
|
||||
|
||||
afterTriggerFreeEventList(&afterTriggers->query_stack[afterTriggers->query_depth]);
|
||||
afterTriggers->query_depth--;
|
||||
}
|
||||
@ -4552,9 +4779,11 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
|
||||
AfterTriggerEventData new_event;
|
||||
AfterTriggerSharedData new_shared;
|
||||
char relkind = relinfo->ri_RelationDesc->rd_rel->relkind;
|
||||
int tgtype_event;
|
||||
int tgtype_level;
|
||||
int i;
|
||||
Tuplestorestate *fdw_tuplestore = NULL;
|
||||
|
||||
/*
|
||||
* Check state. We use normal tests not Asserts because it is possible to
|
||||
@ -4573,7 +4802,6 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
* validation is important to make sure we don't walk off the edge of our
|
||||
* arrays.
|
||||
*/
|
||||
new_event.ate_flags = 0;
|
||||
switch (event)
|
||||
{
|
||||
case TRIGGER_EVENT_INSERT:
|
||||
@ -4618,7 +4846,6 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
Assert(newtup != NULL);
|
||||
ItemPointerCopy(&(oldtup->t_self), &(new_event.ate_ctid1));
|
||||
ItemPointerCopy(&(newtup->t_self), &(new_event.ate_ctid2));
|
||||
new_event.ate_flags |= AFTER_TRIGGER_2CTIDS;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -4641,6 +4868,11 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
break;
|
||||
}
|
||||
|
||||
if (!(relkind == RELKIND_FOREIGN_TABLE && row_trigger))
|
||||
new_event.ate_flags = (row_trigger && event == TRIGGER_EVENT_UPDATE) ?
|
||||
AFTER_TRIGGER_2CTID : AFTER_TRIGGER_1CTID;
|
||||
/* else, we'll initialize ate_flags for each trigger */
|
||||
|
||||
tgtype_level = (row_trigger ? TRIGGER_TYPE_ROW : TRIGGER_TYPE_STATEMENT);
|
||||
|
||||
for (i = 0; i < trigdesc->numtriggers; i++)
|
||||
@ -4656,6 +4888,18 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
modifiedCols, oldtup, newtup))
|
||||
continue;
|
||||
|
||||
if (relkind == RELKIND_FOREIGN_TABLE && row_trigger)
|
||||
{
|
||||
if (fdw_tuplestore == NULL)
|
||||
{
|
||||
fdw_tuplestore = GetCurrentFDWTuplestore();
|
||||
new_event.ate_flags = AFTER_TRIGGER_FDW_FETCH;
|
||||
}
|
||||
else
|
||||
/* subsequent event for the same tuple */
|
||||
new_event.ate_flags = AFTER_TRIGGER_FDW_REUSE;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the trigger is a foreign key enforcement trigger, there are
|
||||
* certain cases where we can skip queueing the event because we can
|
||||
@ -4717,6 +4961,19 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
afterTriggerAddEvent(&afterTriggers->query_stack[afterTriggers->query_depth],
|
||||
&new_event, &new_shared);
|
||||
}
|
||||
|
||||
/*
|
||||
* Finally, spool any foreign tuple(s). The tuplestore squashes them to
|
||||
* minimal tuples, so this loses any system columns. The executor lost
|
||||
* those columns before us, for an unrelated reason, so this is fine.
|
||||
*/
|
||||
if (fdw_tuplestore)
|
||||
{
|
||||
if (oldtup != NULL)
|
||||
tuplestore_puttuple(fdw_tuplestore, oldtup);
|
||||
if (newtup != NULL)
|
||||
tuplestore_puttuple(fdw_tuplestore, newtup);
|
||||
}
|
||||
}
|
||||
|
||||
Datum
|
||||
|
Reference in New Issue
Block a user