1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-13 07:41:39 +03:00

Add a WHEN clause to CREATE TRIGGER, allowing a boolean expression to be

checked to determine whether the trigger should be fired.

For BEFORE triggers this is mostly a matter of spec compliance; but for AFTER
triggers it can provide a noticeable performance improvement, since queuing of
a deferred trigger event and re-fetching of the row(s) at end of statement can
be short-circuited if the trigger does not need to be fired.

Takahiro Itagaki, reviewed by KaiGai Kohei.
This commit is contained in:
Tom Lane
2009-11-20 20:38:12 +00:00
parent 201a45c4fa
commit 7fc0f06221
27 changed files with 783 additions and 100 deletions

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.256 2009/10/27 20:14:27 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.257 2009/11/20 20:38:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -32,10 +32,14 @@
#include "miscadmin.h"
#include "nodes/bitmapset.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "pgstat.h"
#include "rewrite/rewriteManip.h"
#include "storage/bufmgr.h"
#include "tcop/utility.h"
#include "utils/acl.h"
@ -65,21 +69,27 @@ static HeapTuple GetTupleForTrigger(EState *estate,
ResultRelInfo *relinfo,
ItemPointer tid,
TupleTableSlot **newSlot);
static bool TriggerEnabled(Trigger *trigger, TriggerEvent event,
Bitmapset *modifiedCols);
static bool TriggerEnabled(EState *estate, ResultRelInfo *relinfo,
Trigger *trigger, TriggerEvent event,
Bitmapset *modifiedCols,
HeapTuple oldtup, HeapTuple newtup);
static HeapTuple ExecCallTriggerFunc(TriggerData *trigdata,
int tgindx,
FmgrInfo *finfo,
Instrumentation *instr,
MemoryContext per_tuple_context);
static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event,
bool row_trigger, HeapTuple oldtup, HeapTuple newtup,
static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup,
List *recheckIndexes, Bitmapset *modifiedCols);
/*
* Create a trigger. Returns the OID of the created trigger.
*
* queryString is the source text of the CREATE TRIGGER command.
* This must be supplied if a whenClause is specified, else it can be NULL.
*
* constraintOid, if nonzero, says that this trigger is being created
* internally to implement that constraint. A suitable pg_depend entry will
* be made to link the trigger to that constraint. constraintOid is zero when
@ -101,7 +111,7 @@ static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event,
* but a foreign-key constraint. This is a kluge for backwards compatibility.
*/
Oid
CreateTrigger(CreateTrigStmt *stmt,
CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
Oid constraintOid, Oid indexOid, const char *prefix,
bool checkPermissions)
{
@ -109,6 +119,9 @@ CreateTrigger(CreateTrigStmt *stmt,
int ncolumns;
int2 *columns;
int2vector *tgattr;
Node *whenClause;
List *whenRtable;
char *qual;
Datum values[Natts_pg_trigger];
bool nulls[Natts_pg_trigger];
Relation rel;
@ -179,6 +192,120 @@ CreateTrigger(CreateTrigStmt *stmt,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("TRUNCATE FOR EACH ROW triggers are not supported")));
/*
* Parse the WHEN clause, if any
*/
if (stmt->whenClause)
{
ParseState *pstate;
RangeTblEntry *rte;
List *varList;
ListCell *lc;
/* Set up a pstate to parse with */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
/*
* Set up RTEs for OLD and NEW references.
*
* 'OLD' must always have varno equal to 1 and 'NEW' equal to 2.
*/
rte = addRangeTableEntryForRelation(pstate, rel,
makeAlias("old", NIL),
false, false);
addRTEtoQuery(pstate, rte, false, true, true);
rte = addRangeTableEntryForRelation(pstate, rel,
makeAlias("new", NIL),
false, false);
addRTEtoQuery(pstate, rte, false, true, true);
/* Transform expression. Copy to be sure we don't modify original */
whenClause = transformWhereClause(pstate,
copyObject(stmt->whenClause),
"WHEN");
/*
* No subplans or aggregates, please
*/
if (pstate->p_hasSubLinks)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use subquery in trigger WHEN condition")));
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in trigger WHEN condition")));
if (pstate->p_hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_WINDOWING_ERROR),
errmsg("cannot use window function in trigger WHEN condition")));
/*
* Check for disallowed references to OLD/NEW.
*
* NB: pull_var_clause is okay here only because we don't allow
* subselects in WHEN clauses; it would fail to examine the contents
* of subselects.
*/
varList = pull_var_clause(whenClause, PVC_REJECT_PLACEHOLDERS);
foreach(lc, varList)
{
Var *var = (Var *) lfirst(lc);
switch (var->varno)
{
case PRS2_OLD_VARNO:
if (!TRIGGER_FOR_ROW(tgtype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("statement trigger's WHEN condition cannot reference column values"),
parser_errposition(pstate, var->location)));
if (TRIGGER_FOR_INSERT(tgtype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("INSERT trigger's WHEN condition cannot reference OLD values"),
parser_errposition(pstate, var->location)));
/* system columns are okay here */
break;
case PRS2_NEW_VARNO:
if (!TRIGGER_FOR_ROW(tgtype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("statement trigger's WHEN condition cannot reference column values"),
parser_errposition(pstate, var->location)));
if (TRIGGER_FOR_DELETE(tgtype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("DELETE trigger's WHEN condition cannot reference NEW values"),
parser_errposition(pstate, var->location)));
if (var->varattno < 0 && TRIGGER_FOR_BEFORE(tgtype))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("BEFORE trigger's WHEN condition cannot reference NEW system columns"),
parser_errposition(pstate, var->location)));
break;
default:
/* can't happen without add_missing_from, so just elog */
elog(ERROR, "trigger WHEN condition cannot contain references to other relations");
break;
}
}
/* we'll need the rtable for recordDependencyOnExpr */
whenRtable = pstate->p_rtable;
qual = nodeToString(whenClause);
free_parsestate(pstate);
}
else
{
whenClause = NULL;
whenRtable = NIL;
qual = NULL;
}
/*
* Find and validate the trigger function.
*/
@ -387,6 +514,12 @@ CreateTrigger(CreateTrigStmt *stmt,
tgattr = buildint2vector(columns, ncolumns);
values[Anum_pg_trigger_tgattr - 1] = PointerGetDatum(tgattr);
/* set tgqual if trigger has WHEN clause */
if (qual)
values[Anum_pg_trigger_tgqual - 1] = CStringGetTextDatum(qual);
else
nulls[Anum_pg_trigger_tgqual - 1] = true;
tuple = heap_form_tuple(tgrel->rd_att, values, nulls);
/* force tuple to have the desired OID */
@ -495,6 +628,14 @@ CreateTrigger(CreateTrigStmt *stmt,
}
}
/*
* If it has a WHEN clause, add dependencies on objects mentioned in
* the expression (eg, functions, as well as any columns used).
*/
if (whenClause != NULL)
recordDependencyOnExpr(&myself, whenClause, whenRtable,
DEPENDENCY_NORMAL);
/* Keep lock on target rel until end of xact */
heap_close(rel, NoLock);
@ -1184,6 +1325,8 @@ RelationBuildTriggers(Relation relation)
{
Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
Trigger *build;
Datum datum;
bool isnull;
if (numtrigs >= maxtrigs)
{
@ -1218,7 +1361,6 @@ RelationBuildTriggers(Relation relation)
if (build->tgnargs > 0)
{
bytea *val;
bool isnull;
char *p;
val = DatumGetByteaP(fastgetattr(htup,
@ -1237,6 +1379,12 @@ RelationBuildTriggers(Relation relation)
}
else
build->tgargs = NULL;
datum = fastgetattr(htup, Anum_pg_trigger_tgqual,
tgrel->rd_att, &isnull);
if (!isnull)
build->tgqual = TextDatumGetCString(datum);
else
build->tgqual = NULL;
numtrigs++;
}
@ -1396,6 +1544,8 @@ CopyTriggerDesc(TriggerDesc *trigdesc)
newargs[j] = pstrdup(trigger->tgargs[j]);
trigger->tgargs = newargs;
}
if (trigger->tgqual)
trigger->tgqual = pstrdup(trigger->tgqual);
trigger++;
}
@ -1497,6 +1647,8 @@ FreeTriggerDesc(TriggerDesc *trigdesc)
pfree(trigger->tgargs[trigger->tgnargs]);
pfree(trigger->tgargs);
}
if (trigger->tgqual)
pfree(trigger->tgqual);
trigger++;
}
pfree(trigdesc->triggers);
@ -1520,6 +1672,11 @@ equalTriggerDescs(TriggerDesc *trigdesc1, TriggerDesc *trigdesc2)
*
* As of 7.3 we assume trigger set ordering is significant in the
* comparison; so we just compare corresponding slots of the two sets.
*
* Note: comparing the stringToNode forms of the WHEN clauses means that
* parse column locations will affect the result. This is okay as long
* as this function is only used for detecting exact equality, as for
* example in checking for staleness of a cache entry.
*/
if (trigdesc1 != NULL)
{
@ -1565,6 +1722,12 @@ equalTriggerDescs(TriggerDesc *trigdesc1, TriggerDesc *trigdesc2)
for (j = 0; j < trig1->tgnargs; j++)
if (strcmp(trig1->tgargs[j], trig2->tgargs[j]) != 0)
return false;
if (trig1->tgqual == NULL && trig2->tgqual == NULL)
/* ok */ ;
else if (trig1->tgqual == NULL || trig2->tgqual == NULL)
return false;
else if (strcmp(trig1->tgqual, trig2->tgqual) != 0)
return false;
}
}
else if (trigdesc2 != NULL)
@ -1687,7 +1850,8 @@ ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
HeapTuple newtuple;
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, NULL))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
NULL, NULL, NULL))
continue;
LocTriggerData.tg_trigger = trigger;
@ -1710,7 +1874,7 @@ ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo)
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && trigdesc->n_after_statement[TRIGGER_EVENT_INSERT] > 0)
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_INSERT,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
false, NULL, NULL, NIL, NULL);
}
@ -1737,7 +1901,8 @@ ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo,
{
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, NULL))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
NULL, NULL, newtuple))
continue;
LocTriggerData.tg_trigtuple = oldtuple = newtuple;
@ -1763,7 +1928,7 @@ ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo,
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_INSERT,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
true, NULL, trigtuple, recheckIndexes, NULL);
}
@ -1800,7 +1965,8 @@ ExecBSDeleteTriggers(EState *estate, ResultRelInfo *relinfo)
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
HeapTuple newtuple;
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, NULL))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
NULL, NULL, NULL))
continue;
LocTriggerData.tg_trigger = trigger;
@ -1823,7 +1989,7 @@ ExecASDeleteTriggers(EState *estate, ResultRelInfo *relinfo)
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && trigdesc->n_after_statement[TRIGGER_EVENT_DELETE] > 0)
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_DELETE,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
false, NULL, NULL, NIL, NULL);
}
@ -1858,7 +2024,8 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate,
{
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, NULL))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
NULL, trigtuple, NULL))
continue;
LocTriggerData.tg_trigtuple = trigtuple;
@ -1893,7 +2060,7 @@ ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo,
tupleid, NULL);
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_DELETE,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
true, trigtuple, NULL, NIL, NULL);
heap_freetuple(trigtuple);
}
@ -1935,7 +2102,8 @@ ExecBSUpdateTriggers(EState *estate, ResultRelInfo *relinfo)
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
HeapTuple newtuple;
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, modifiedCols))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
modifiedCols, NULL, NULL))
continue;
LocTriggerData.tg_trigger = trigger;
@ -1958,7 +2126,7 @@ ExecASUpdateTriggers(EState *estate, ResultRelInfo *relinfo)
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && trigdesc->n_after_statement[TRIGGER_EVENT_UPDATE] > 0)
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_UPDATE,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
false, NULL, NULL, NIL,
GetModifiedColumns(relinfo, estate));
}
@ -2003,7 +2171,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
{
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, modifiedCols))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
modifiedCols, trigtuple, newtuple))
continue;
LocTriggerData.tg_trigtuple = trigtuple;
@ -2037,7 +2206,7 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
HeapTuple trigtuple = GetTupleForTrigger(estate, NULL, relinfo,
tupleid, NULL);
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_UPDATE,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
true, trigtuple, newtuple, recheckIndexes,
GetModifiedColumns(relinfo, estate));
heap_freetuple(trigtuple);
@ -2077,7 +2246,8 @@ ExecBSTruncateTriggers(EState *estate, ResultRelInfo *relinfo)
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
HeapTuple newtuple;
if (!TriggerEnabled(trigger, LocTriggerData.tg_event, NULL))
if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
NULL, NULL, NULL))
continue;
LocTriggerData.tg_trigger = trigger;
@ -2100,7 +2270,7 @@ ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo)
TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
if (trigdesc && trigdesc->n_after_statement[TRIGGER_EVENT_TRUNCATE] > 0)
AfterTriggerSaveEvent(relinfo, TRIGGER_EVENT_TRUNCATE,
AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_TRUNCATE,
false, NULL, NULL, NIL, NULL);
}
@ -2219,7 +2389,10 @@ ltrmark:;
* Is trigger enabled to fire?
*/
static bool
TriggerEnabled(Trigger *trigger, TriggerEvent event, Bitmapset *modifiedCols)
TriggerEnabled(EState *estate, ResultRelInfo *relinfo,
Trigger *trigger, TriggerEvent event,
Bitmapset *modifiedCols,
HeapTuple oldtup, HeapTuple newtup)
{
/* Check replication-role-dependent enable state */
if (SessionReplicationRole == SESSION_REPLICATION_ROLE_REPLICA)
@ -2258,6 +2431,94 @@ TriggerEnabled(Trigger *trigger, TriggerEvent event, Bitmapset *modifiedCols)
return false;
}
/* Check for WHEN clause */
if (trigger->tgqual)
{
TupleDesc tupdesc = RelationGetDescr(relinfo->ri_RelationDesc);
List **predicate;
ExprContext *econtext;
TupleTableSlot *oldslot = NULL;
TupleTableSlot *newslot = NULL;
MemoryContext oldContext;
int i;
Assert(estate != NULL);
/*
* trigger is an element of relinfo->ri_TrigDesc->triggers[];
* find the matching element of relinfo->ri_TrigWhenExprs[]
*/
i = trigger - relinfo->ri_TrigDesc->triggers;
predicate = &relinfo->ri_TrigWhenExprs[i];
/*
* If first time through for this WHEN expression, build expression
* nodetrees for it. Keep them in the per-query memory context so
* they'll survive throughout the query.
*/
if (*predicate == NIL)
{
Node *tgqual;
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
tgqual = stringToNode(trigger->tgqual);
/* Change references to OLD and NEW to INNER and OUTER */
ChangeVarNodes(tgqual, PRS2_OLD_VARNO, INNER, 0);
ChangeVarNodes(tgqual, PRS2_NEW_VARNO, OUTER, 0);
/* ExecQual wants implicit-AND form */
tgqual = (Node *) make_ands_implicit((Expr *) tgqual);
*predicate = (List *) ExecPrepareExpr((Expr *) tgqual, estate);
MemoryContextSwitchTo(oldContext);
}
/*
* We will use the EState's per-tuple context for evaluating WHEN
* expressions (creating it if it's not already there).
*/
econtext = GetPerTupleExprContext(estate);
/*
* Put OLD and NEW tuples into tupleslots for expression evaluation.
* These slots can be shared across the whole estate, but be careful
* that they have the current resultrel's tupdesc.
*/
if (HeapTupleIsValid(oldtup))
{
if (estate->es_trig_oldtup_slot == NULL)
{
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
estate->es_trig_oldtup_slot = ExecInitExtraTupleSlot(estate);
MemoryContextSwitchTo(oldContext);
}
oldslot = estate->es_trig_oldtup_slot;
if (oldslot->tts_tupleDescriptor != tupdesc)
ExecSetSlotDescriptor(oldslot, tupdesc);
ExecStoreTuple(oldtup, oldslot, InvalidBuffer, false);
}
if (HeapTupleIsValid(newtup))
{
if (estate->es_trig_tuple_slot == NULL)
{
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
MemoryContextSwitchTo(oldContext);
}
newslot = estate->es_trig_tuple_slot;
if (newslot->tts_tupleDescriptor != tupdesc)
ExecSetSlotDescriptor(newslot, tupdesc);
ExecStoreTuple(newtup, newslot, InvalidBuffer, false);
}
/*
* Finally evaluate the expression, making the old and/or new tuples
* available as INNER/OUTER respectively.
*/
econtext->ecxt_innertuple = oldslot;
econtext->ecxt_outertuple = newslot;
if (!ExecQual(*predicate, econtext, false))
return false;
}
return true;
}
@ -3883,7 +4144,8 @@ AfterTriggerPendingOnRel(Oid relid)
* ----------
*/
static void
AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event, bool row_trigger,
AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup,
List *recheckIndexes, Bitmapset *modifiedCols)
{
@ -3993,7 +4255,8 @@ AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event, bool row_trigger,
{
Trigger *trigger = &trigdesc->triggers[tgindx[i]];
if (!TriggerEnabled(trigger, event, modifiedCols))
if (!TriggerEnabled(estate, relinfo, trigger, event,
modifiedCols, oldtup, newtup))
continue;
/*