1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Make ALTER TRIGGER RENAME consistent for partitioned tables

Renaming triggers on partitioned tables had two problems: first,
it did not recurse to renaming the triggers on the partitions; and
second, it failed to prohibit renaming clone triggers.  Having triggers
with different names in partitions is pointless, and furthermore pg_dump
would not preserve names for partitions anyway.

Not backpatched -- making the ALTER TRIGGER throw an error in stable
versions might cause problems for existing scripts.

Co-authored-by: Arne Roland <A.Roland@index.de>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Discussion: https://postgr.es/m/d0fd7040c2fb4de1a111b9d9ccc456b8@index.de
This commit is contained in:
Alvaro Herrera
2021-07-22 18:33:47 -04:00
parent 73c5d2bfee
commit 80ba4bb383
4 changed files with 308 additions and 46 deletions

View File

@ -71,6 +71,12 @@ int SessionReplicationRole = SESSION_REPLICATION_ROLE_ORIGIN;
static int MyTriggerDepth = 0;
/* Local function prototypes */
static void renametrig_internal(Relation tgrel, Relation targetrel,
HeapTuple trigtup, const char *newname,
const char *expected_name);
static void renametrig_partition(Relation tgrel, Oid partitionId,
Oid parentTriggerOid, const char *newname,
const char *expected_name);
static void SetTriggerFlags(TriggerDesc *trigdesc, Trigger *trigger);
static bool GetTupleForTrigger(EState *estate,
EPQState *epqstate,
@ -1442,38 +1448,16 @@ renametrig(RenameStmt *stmt)
targetrel = relation_open(relid, NoLock);
/*
* Scan pg_trigger twice for existing triggers on relation. We do this in
* order to ensure a trigger does not exist with newname (The unique index
* on tgrelid/tgname would complain anyway) and to ensure a trigger does
* exist with oldname.
*
* NOTE that this is cool only because we have AccessExclusiveLock on the
* relation, so the trigger set won't be changing underneath us.
* On partitioned tables, this operation recurses to partitions. Lock all
* tables upfront.
*/
if (targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
(void) find_all_inheritors(relid, AccessExclusiveLock, NULL);
tgrel = table_open(TriggerRelationId, RowExclusiveLock);
/*
* First pass -- look for name conflict
*/
ScanKeyInit(&key[0],
Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
PointerGetDatum(stmt->newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
NULL, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
stmt->newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
* Second pass -- look for trigger existing with oldname and update
* Search for the trigger to modify.
*/
ScanKeyInit(&key[0],
Anum_pg_trigger_tgrelid,
@ -1489,27 +1473,40 @@ renametrig(RenameStmt *stmt)
{
Form_pg_trigger trigform;
/*
* Update pg_trigger tuple with new tgname.
*/
tuple = heap_copytuple(tuple); /* need a modifiable copy */
trigform = (Form_pg_trigger) GETSTRUCT(tuple);
tgoid = trigform->oid;
namestrcpy(&trigform->tgname,
stmt->newname);
CatalogTupleUpdate(tgrel, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(TriggerRelationId,
tgoid, 0);
/*
* Invalidate relation's relcache entry so that other backends (and
* this one too!) are sent SI message to make them rebuild relcache
* entries. (Ideally this should happen automatically...)
* If the trigger descends from a trigger on a parent partitioned
* table, reject the rename. We don't allow a trigger in a partition
* to differ in name from that of its parent: that would lead to an
* inconsistency that pg_dump would not reproduce.
*/
CacheInvalidateRelcache(targetrel);
if (OidIsValid(trigform->tgparentid))
ereport(ERROR,
errmsg("cannot rename trigger \"%s\" on table \"%s\"",
stmt->subname, RelationGetRelationName(targetrel)),
errhint("Rename trigger on partitioned table \"%s\" instead.",
get_rel_name(get_partition_parent(relid, false))));
/* Rename the trigger on this relation ... */
renametrig_internal(tgrel, targetrel, tuple, stmt->newname,
stmt->subname);
/* ... and if it is partitioned, recurse to its partitions */
if (targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc partdesc = RelationGetPartitionDesc(targetrel, true);
for (int i = 0; i < partdesc->nparts; i++)
{
Oid partitionId = partdesc->oids[i];
renametrig_partition(tgrel, partitionId, trigform->oid,
stmt->newname, stmt->subname);
}
}
}
else
{
@ -1533,6 +1530,137 @@ renametrig(RenameStmt *stmt)
return address;
}
/*
* Subroutine for renametrig -- perform the actual work of renaming one
* trigger on one table.
*
* If the trigger has a name different from the expected one, raise a
* NOTICE about it.
*/
static void
renametrig_internal(Relation tgrel, Relation targetrel, HeapTuple trigtup,
const char *newname, const char *expected_name)
{
HeapTuple tuple;
Form_pg_trigger tgform;
ScanKeyData key[2];
SysScanDesc tgscan;
/* If the trigger already has the new name, nothing to do. */
tgform = (Form_pg_trigger) GETSTRUCT(trigtup);
if (strcmp(NameStr(tgform->tgname), newname) == 0)
return;
/*
* Before actually trying the rename, search for triggers with the same
* name. The update would fail with an ugly message in that case, and it
* is better to throw a nicer error.
*/
ScanKeyInit(&key[0],
Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(targetrel)));
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
PointerGetDatum(newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
NULL, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
* The target name is free; update the existing pg_trigger tuple with it.
*/
tuple = heap_copytuple(trigtup); /* need a modifiable copy */
tgform = (Form_pg_trigger) GETSTRUCT(tuple);
/*
* If the trigger has a name different from what we expected, let the user
* know. (We can proceed anyway, since we must have reached here following
* a tgparentid link.)
*/
if (strcmp(NameStr(tgform->tgname), expected_name) != 0)
ereport(NOTICE,
errmsg("renamed trigger \"%s\" on relation \"%s\"",
NameStr(tgform->tgname),
RelationGetRelationName(targetrel)));
namestrcpy(&tgform->tgname, newname);
CatalogTupleUpdate(tgrel, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(TriggerRelationId, tgform->oid, 0);
/*
* Invalidate relation's relcache entry so that other backends (and this
* one too!) are sent SI message to make them rebuild relcache entries.
* (Ideally this should happen automatically...)
*/
CacheInvalidateRelcache(targetrel);
}
/*
* Subroutine for renametrig -- Helper for recursing to partitions when
* renaming triggers on a partitioned table.
*/
static void
renametrig_partition(Relation tgrel, Oid partitionId, Oid parentTriggerOid,
const char *newname, const char *expected_name)
{
SysScanDesc tgscan;
ScanKeyData key;
HeapTuple tuple;
int found PG_USED_FOR_ASSERTS_ONLY = 0;
/*
* Given a relation and the OID of a trigger on parent relation, find the
* corresponding trigger in the child and rename that trigger to the given
* name.
*/
ScanKeyInit(&key,
Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(partitionId));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
NULL, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
{
Form_pg_trigger tgform = (Form_pg_trigger) GETSTRUCT(tuple);
Relation partitionRel;
if (tgform->tgparentid != parentTriggerOid)
continue; /* not our trigger */
Assert(found++ <= 0);
partitionRel = table_open(partitionId, NoLock);
/* Rename the trigger on this partition */
renametrig_internal(tgrel, partitionRel, tuple, newname, expected_name);
/* And if this relation is partitioned, recurse to its partitions */
if (partitionRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc partdesc = RelationGetPartitionDesc(partitionRel,
true);
for (int i = 0; i < partdesc->nparts; i++)
{
Oid partitionId = partdesc->oids[i];
renametrig_partition(tgrel, partitionId, tgform->oid, newname,
NameStr(tgform->tgname));
}
}
table_close(partitionRel, NoLock);
}
systable_endscan(tgscan);
}
/*
* EnableDisableTrigger()