diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index d080297512d..5585e9b6b9b 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -27,6 +27,7 @@ #include "bootstrap/bootstrap.h" #include "catalog/catalog.h" #include "catalog/heap.h" +#include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" @@ -282,6 +283,7 @@ Boot_DeclareIndexStmt: XDECLARE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN { IndexStmt *stmt = makeNode(IndexStmt); + Oid relationId; do_start(); @@ -303,7 +305,12 @@ Boot_DeclareIndexStmt: stmt->initdeferred = false; stmt->concurrent = false; - DefineIndex(stmt, + /* locks and races need not concern us in bootstrap mode */ + relationId = RangeVarGetRelid(stmt->relation, NoLock, + false); + + DefineIndex(relationId, + stmt, $4, false, false, @@ -317,6 +324,7 @@ Boot_DeclareUniqueIndexStmt: XDECLARE UNIQUE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN { IndexStmt *stmt = makeNode(IndexStmt); + Oid relationId; do_start(); @@ -338,7 +346,12 @@ Boot_DeclareUniqueIndexStmt: stmt->initdeferred = false; stmt->concurrent = false; - DefineIndex(stmt, + /* locks and races need not concern us in bootstrap mode */ + relationId = RangeVarGetRelid(stmt->relation, NoLock, + false); + + DefineIndex(relationId, + stmt, $5, false, false, diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 8eae43ddb3d..cebca95ac8d 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1215,18 +1215,13 @@ index_constraint_create(Relation heapRelation, */ if (deferrable) { - RangeVar *heapRel; CreateTrigStmt *trigger; - heapRel = makeRangeVar(get_namespace_name(namespaceId), - pstrdup(RelationGetRelationName(heapRelation)), - -1); - trigger = makeNode(CreateTrigStmt); trigger->trigname = (constraintType == CONSTRAINT_PRIMARY) ? "PK_ConstraintTrigger" : "Unique_ConstraintTrigger"; - trigger->relation = heapRel; + trigger->relation = NULL; trigger->funcname = SystemFuncName("unique_key_recheck"); trigger->args = NIL; trigger->row = true; @@ -1239,7 +1234,8 @@ index_constraint_create(Relation heapRelation, trigger->initdeferred = initdeferred; trigger->constrrel = NULL; - (void) CreateTrigger(trigger, NULL, conOid, indexRelationId, true); + (void) CreateTrigger(trigger, NULL, RelationGetRelid(heapRelation), + InvalidOid, conOid, indexRelationId, true); } /* diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index f2209c82103..5fd9822c6ed 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -751,6 +751,25 @@ AlterConstraintNamespaces(Oid ownerId, Oid oldNspId, heap_close(conRel, RowExclusiveLock); } +/* + * get_constraint_relation_oids + * Find the IDs of the relations to which a constraint refers. + */ +void +get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid) +{ + HeapTuple tup; + Form_pg_constraint con; + + tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraint_oid)); + if (!HeapTupleIsValid(tup)) /* should not happen */ + elog(ERROR, "cache lookup failed for constraint %u", constraint_oid); + con = (Form_pg_constraint) GETSTRUCT(tup); + *conrelid = con->conrelid; + *confrelid = con->confrelid; + ReleaseSysCache(tup); +} + /* * get_relation_constraint_oid * Find a constraint on the specified relation with the specified name. diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 4259c4757a6..38ce023a8a2 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -112,7 +112,6 @@ static void RangeVarCallbackForReindexIndex(const RangeVar *relation, */ bool CheckIndexCompatible(Oid oldId, - RangeVar *heapRelation, char *accessMethodName, List *attributeList, List *exclusionOpNames) @@ -140,7 +139,7 @@ CheckIndexCompatible(Oid oldId, Datum d; /* Caller should already have the relation locked in some way. */ - relationId = RangeVarGetRelid(heapRelation, NoLock, false); + relationId = IndexGetRelation(oldId, false); /* * We can pretend isconstraint = false unconditionally. It only serves to @@ -280,6 +279,8 @@ CheckIndexCompatible(Oid oldId, * DefineIndex * Creates a new index. * + * 'relationId': the OID of the heap relation on which the index is to be + * created * 'stmt': IndexStmt describing the properties of the new index. * 'indexRelationId': normally InvalidOid, but during bootstrap can be * nonzero to specify a preselected OID for the index. @@ -293,7 +294,8 @@ CheckIndexCompatible(Oid oldId, * Returns the OID of the created index. */ Oid -DefineIndex(IndexStmt *stmt, +DefineIndex(Oid relationId, + IndexStmt *stmt, Oid indexRelationId, bool is_alter_table, bool check_rights, @@ -306,7 +308,6 @@ DefineIndex(IndexStmt *stmt, Oid *collationObjectId; Oid *classObjectId; Oid accessMethodId; - Oid relationId; Oid namespaceId; Oid tablespaceId; List *indexColNames; @@ -325,6 +326,7 @@ DefineIndex(IndexStmt *stmt, int n_old_snapshots; LockRelId heaprelid; LOCKTAG heaplocktag; + LOCKMODE lockmode; Snapshot snapshot; int i; @@ -343,14 +345,18 @@ DefineIndex(IndexStmt *stmt, INDEX_MAX_KEYS))); /* - * Open heap relation, acquire a suitable lock on it, remember its OID - * * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE * (but not VACUUM). + * + * NB: Caller is responsible for making sure that relationId refers + * to the relation on which the index should be built; except in bootstrap + * mode, this will typically require the caller to have already locked + * the relation. To avoid lock upgrade hazards, that lock should be at + * least as strong as the one we take here. */ - rel = heap_openrv(stmt->relation, - (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock)); + lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock; + rel = heap_open(relationId, lockmode); relationId = RelationGetRelid(rel); namespaceId = RelationGetNamespace(rel); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 08b037e501f..fe0c0aca291 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -296,7 +296,8 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); -static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, +static void createForeignKeyTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, Oid indexOid); static void ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, @@ -373,8 +374,9 @@ static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, static void ATExecAlterColumnGenericOptions(Relation rel, const char *colName, List *options, LOCKMODE lockmode); static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode); -static void ATPostAlterTypeParse(Oid oldId, char *cmd, - List **wqueue, LOCKMODE lockmode, bool rewrite); +static void ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, + char *cmd, List **wqueue, LOCKMODE lockmode, + bool rewrite); static void TryReuseIndex(Oid oldId, IndexStmt *stmt); static void TryReuseForeignKey(Oid oldId, Constraint *con); static void change_owner_fix_column_acls(Oid relationOid, @@ -5539,7 +5541,8 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, /* The IndexStmt has already been through transformIndexStmt */ - new_index = DefineIndex(stmt, + new_index = DefineIndex(RelationGetRelid(rel), + stmt, InvalidOid, /* no predefined OID */ true, /* is_alter_table */ check_rights, @@ -5863,7 +5866,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * table; trying to start with a lesser lock will just create a risk of * deadlock.) */ - pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); + if (OidIsValid(fkconstraint->old_pktable_oid)) + pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock); + else + pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); /* * Validity checks (permission checks wait till we have the column @@ -6202,7 +6208,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * Create the triggers that will enforce the constraint. */ - createForeignKeyTriggers(rel, fkconstraint, constrOid, indexOid); + createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, + constrOid, indexOid); /* * Tell Phase 3 to check that the constraint is satisfied by existing @@ -7012,7 +7019,7 @@ validateForeignKeyConstraint(char *conname, } static void -CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, +CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid, bool on_insert) { CreateTrigStmt *fk_trigger; @@ -7028,7 +7035,7 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_c"; - fk_trigger->relation = myRel; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; @@ -7049,10 +7056,11 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, fk_trigger->isconstraint = true; fk_trigger->deferrable = fkconstraint->deferrable; fk_trigger->initdeferred = fkconstraint->initdeferred; - fk_trigger->constrrel = fkconstraint->pktable; + fk_trigger->constrrel = NULL; fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, myRelOid, refRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -7062,18 +7070,13 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, * Create the triggers that implement an FK constraint. */ static void -createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, +createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid) { - RangeVar *myRel; + Oid myRelOid; CreateTrigStmt *fk_trigger; - /* - * Reconstruct a RangeVar for my relation (not passed in, unfortunately). - */ - myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel)), - -1); + myRelOid = RelationGetRelid(rel); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -7084,14 +7087,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_a"; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; fk_trigger->events = TRIGGER_TYPE_DELETE; fk_trigger->columns = NIL; fk_trigger->whenClause = NULL; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_del_action) { case FKCONSTR_ACTION_NOACTION: @@ -7126,7 +7129,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -7137,14 +7141,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_a"; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; fk_trigger->events = TRIGGER_TYPE_UPDATE; fk_trigger->columns = NIL; fk_trigger->whenClause = NULL; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_upd_action) { case FKCONSTR_ACTION_NOACTION: @@ -7179,7 +7183,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -7188,8 +7193,10 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, * Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK * action for both INSERTs and UPDATEs on the referencing table. */ - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, true); - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, false); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, + indexOid, true); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, + indexOid, false); } /* @@ -8093,15 +8100,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) * lock on the table the constraint is attached to, and we need to get * that before dropping. It's safe because the parser won't actually look * at the catalogs to detect the existing entry. + * + * We can't rely on the output of deparsing to tell us which relation + * to operate on, because concurrent activity might have made the name + * resolve differently. Instead, we've got to use the OID of the + * constraint or index we're processing to figure out which relation + * to operate on. */ forboth(oid_item, tab->changedConstraintOids, def_item, tab->changedConstraintDefs) - ATPostAlterTypeParse(lfirst_oid(oid_item), (char *) lfirst(def_item), + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + Oid confrelid; + + get_constraint_relation_oids(oldId, &relid, &confrelid); + ATPostAlterTypeParse(oldId, relid, confrelid, + (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); + } forboth(oid_item, tab->changedIndexOids, def_item, tab->changedIndexDefs) - ATPostAlterTypeParse(lfirst_oid(oid_item), (char *) lfirst(def_item), + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + + relid = IndexGetRelation(oldId, false); + ATPostAlterTypeParse(oldId, relid, InvalidOid, + (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); + } /* * Now we can drop the existing constraints and indexes --- constraints @@ -8134,12 +8162,13 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) } static void -ATPostAlterTypeParse(Oid oldId, char *cmd, +ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, List **wqueue, LOCKMODE lockmode, bool rewrite) { List *raw_parsetree_list; List *querytree_list; ListCell *list_item; + Relation rel; /* * We expect that we will get only ALTER TABLE and CREATE INDEX @@ -8155,16 +8184,21 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, if (IsA(stmt, IndexStmt)) querytree_list = lappend(querytree_list, - transformIndexStmt((IndexStmt *) stmt, + transformIndexStmt(oldRelId, + (IndexStmt *) stmt, cmd)); else if (IsA(stmt, AlterTableStmt)) querytree_list = list_concat(querytree_list, - transformAlterTableStmt((AlterTableStmt *) stmt, + transformAlterTableStmt(oldRelId, + (AlterTableStmt *) stmt, cmd)); else querytree_list = lappend(querytree_list, stmt); } + /* Caller should already have acquired whatever lock we need. */ + rel = relation_open(oldRelId, NoLock); + /* * Attach each generated command to the proper place in the work queue. * Note this could result in creation of entirely new work-queue entries. @@ -8176,7 +8210,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, foreach(list_item, querytree_list) { Node *stm = (Node *) lfirst(list_item); - Relation rel; AlteredTableInfo *tab; switch (nodeTag(stm)) @@ -8189,14 +8222,12 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, if (!rewrite) TryReuseIndex(oldId, stmt); - rel = relation_openrv(stmt->relation, lockmode); tab = ATGetQueueEntry(wqueue, rel); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_ReAddIndex; newcmd->def = (Node *) stmt; tab->subcmds[AT_PASS_OLD_INDEX] = lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); - relation_close(rel, NoLock); break; } case T_AlterTableStmt: @@ -8204,7 +8235,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, AlterTableStmt *stmt = (AlterTableStmt *) stm; ListCell *lcmd; - rel = relation_openrv(stmt->relation, lockmode); tab = ATGetQueueEntry(wqueue, rel); foreach(lcmd, stmt->cmds) { @@ -8225,6 +8255,7 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, case AT_AddConstraint: Assert(IsA(cmd->def, Constraint)); con = (Constraint *) cmd->def; + con->old_pktable_oid = refRelId; /* rewriting neither side of a FK */ if (con->contype == CONSTR_FOREIGN && !rewrite && !tab->rewrite) @@ -8238,7 +8269,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, (int) cmd->subtype); } } - relation_close(rel, NoLock); break; } default: @@ -8246,6 +8276,8 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, (int) nodeTag(stm)); } } + + relation_close(rel, NoLock); } /* @@ -8256,7 +8288,6 @@ static void TryReuseIndex(Oid oldId, IndexStmt *stmt) { if (CheckIndexCompatible(oldId, - stmt->relation, stmt->accessMethod, stmt->indexParams, stmt->excludeOpNames)) @@ -10879,6 +10910,38 @@ RangeVarCallbackOwnsTable(const RangeVar *relation, aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); } +/* + * Callback to RangeVarGetRelidExtended(), similar to + * RangeVarCallbackOwnsTable() but without checks on the type of the relation. + */ +void +RangeVarCallbackOwnsRelation(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + HeapTuple tuple; + + /* Nothing to do if the relation was not found. */ + if (!OidIsValid(relId)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId)); + if (!HeapTupleIsValid(tuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for relation %u", relId); + + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + relation->relname); + + if (!allowSystemTableMods && + IsSystemClass(relId, (Form_pg_class) GETSTRUCT(tuple))) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + relation->relname))); + + ReleaseSysCache(tuple); +} + /* * Common RangeVarGetRelid callback for rename, set schema, and alter table * processing. diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 86449a641cc..fa74bd2cf10 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -43,6 +43,7 @@ #include "pgstat.h" #include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" +#include "storage/lmgr.h" #include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -96,6 +97,13 @@ static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, * queryString is the source text of the CREATE TRIGGER command. * This must be supplied if a whenClause is specified, else it can be NULL. * + * relOid, if nonzero, is the relation on which the trigger should be + * created. If zero, the name provided in the statement will be looked up. + * + * refRelOid, if nonzero, is the relation to which the constraint trigger + * refers. If zero, the constraint relation name provided in the statement + * will be looked up as needed. + * * constraintOid, if nonzero, says that this trigger is being created * internally to implement that constraint. A suitable pg_depend entry will * be made to link the trigger to that constraint. constraintOid is zero when @@ -118,7 +126,7 @@ static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, */ Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString, - Oid constraintOid, Oid indexOid, + Oid relOid, Oid refRelOid, Oid constraintOid, Oid indexOid, bool isInternal) { int16 tgtype; @@ -147,7 +155,10 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, ObjectAddress myself, referenced; - rel = heap_openrv(stmt->relation, AccessExclusiveLock); + if (OidIsValid(relOid)) + rel = heap_open(relOid, AccessExclusiveLock); + else + rel = heap_openrv(stmt->relation, AccessExclusiveLock); /* * Triggers must be on tables or views, and there are additional @@ -196,7 +207,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, errmsg("permission denied: \"%s\" is a system catalog", RelationGetRelationName(rel)))); - if (stmt->isconstraint && stmt->constrrel != NULL) + if (stmt->isconstraint) { /* * We must take a lock on the target relation to protect against @@ -205,7 +216,14 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, * might end up creating a pg_constraint entry referencing a * nonexistent table. */ - constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false); + if (OidIsValid(refRelOid)) + { + LockRelationOid(refRelOid, AccessShareLock); + constrrelid = refRelOid; + } + else if (stmt->constrrel != NULL) + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, + false); } /* permission checks */ @@ -501,7 +519,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("trigger \"%s\" for relation \"%s\" already exists", - trigname, stmt->relation->relname))); + trigname, RelationGetRelationName(rel)))); } systable_endscan(tgscan); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 6a59703025b..c89d8083c67 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -2411,6 +2411,7 @@ _copyConstraint(const Constraint *from) COPY_SCALAR_FIELD(fk_upd_action); COPY_SCALAR_FIELD(fk_del_action); COPY_NODE_FIELD(old_conpfeqop); + COPY_SCALAR_FIELD(old_pktable_oid); COPY_SCALAR_FIELD(skip_validation); COPY_SCALAR_FIELD(initially_valid); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 0bcbf42bfc4..9793cf52a8c 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2239,6 +2239,7 @@ _equalConstraint(const Constraint *a, const Constraint *b) COMPARE_SCALAR_FIELD(fk_upd_action); COMPARE_SCALAR_FIELD(fk_del_action); COMPARE_NODE_FIELD(old_conpfeqop); + COMPARE_SCALAR_FIELD(old_pktable_oid); COMPARE_SCALAR_FIELD(skip_validation); COMPARE_SCALAR_FIELD(initially_valid); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 568c3b826ce..bfb4b9fb03d 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2709,6 +2709,7 @@ _outConstraint(StringInfo str, const Constraint *node) WRITE_CHAR_FIELD(fk_upd_action); WRITE_CHAR_FIELD(fk_del_action); WRITE_NODE_FIELD(old_conpfeqop); + WRITE_OID_FIELD(old_pktable_oid); WRITE_BOOL_FIELD(skip_validation); WRITE_BOOL_FIELD(initially_valid); break; diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index eb07ca3d374..1e071d7908b 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -1903,14 +1903,18 @@ transformFKConstraints(CreateStmtContext *cxt, * a predicate expression. There are several code paths that create indexes * without bothering to call this, because they know they don't have any * such expressions to deal with. + * + * To avoid race conditions, it's important that this function rely only on + * the passed-in relid (and not on stmt->relation) to determine the target + * relation. */ IndexStmt * -transformIndexStmt(IndexStmt *stmt, const char *queryString) +transformIndexStmt(Oid relid, IndexStmt *stmt, const char *queryString) { - Relation rel; ParseState *pstate; RangeTblEntry *rte; ListCell *l; + Relation rel; /* * We must not scribble on the passed-in IndexStmt, so copy it. (This is @@ -1918,26 +1922,17 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString) */ stmt = (IndexStmt *) copyObject(stmt); - /* - * Open the parent table with appropriate locking. We must do this - * because addRangeTableEntry() would acquire only AccessShareLock, - * leaving DefineIndex() needing to do a lock upgrade with consequent risk - * of deadlock. Make sure this stays in sync with the type of lock - * DefineIndex() wants. If we are being called by ALTER TABLE, we will - * already hold a higher lock. - */ - rel = heap_openrv(stmt->relation, - (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock)); - /* Set up pstate */ pstate = make_parsestate(NULL); pstate->p_sourcetext = queryString; /* * Put the parent table into the rtable so that the expressions can refer - * to its fields without qualification. + * to its fields without qualification. Caller is responsible for locking + * relation, but we still need to open it. */ - rte = addRangeTableEntry(pstate, stmt->relation, NULL, false, true); + rel = relation_open(relid, NoLock); + rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, true); /* no to join list, yes to namespaces */ addRTEtoQuery(pstate, rte, false, true, true); @@ -1998,7 +1993,7 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString) free_parsestate(pstate); - /* Close relation, but keep the lock */ + /* Close relation */ heap_close(rel, NoLock); return stmt; @@ -2317,9 +2312,14 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString, * Returns a List of utility commands to be done in sequence. One of these * will be the transformed AlterTableStmt, but there may be additional actions * to be done before and after the actual AlterTable() call. + * + * To avoid race conditions, it's important that this function rely only on + * the passed-in relid (and not on stmt->relation) to determine the target + * relation. */ List * -transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) +transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, + const char *queryString) { Relation rel; ParseState *pstate; @@ -2331,7 +2331,6 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) List *newcmds = NIL; bool skipValidation = true; AlterTableCmd *newcmd; - LOCKMODE lockmode; /* * We must not scribble on the passed-in AlterTableStmt, so copy it. (This @@ -2339,29 +2338,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) */ stmt = (AlterTableStmt *) copyObject(stmt); - /* - * Determine the appropriate lock level for this list of subcommands. - */ - lockmode = AlterTableGetLockLevel(stmt->cmds); - - /* - * Acquire appropriate lock on the target relation, which will be held - * until end of transaction. This ensures any decisions we make here - * based on the state of the relation will still be good at execution. We - * must get lock now because execution will later require it; taking a - * lower grade lock now and trying to upgrade later risks deadlock. Any - * new commands we add after this must not upgrade the lock level - * requested here. - */ - rel = relation_openrv_extended(stmt->relation, lockmode, stmt->missing_ok); - if (rel == NULL) - { - /* this message is consistent with relation_openrv */ - ereport(NOTICE, - (errmsg("relation \"%s\" does not exist, skipping", - stmt->relation->relname))); - return NIL; - } + /* Caller is responsible for locking the relation */ + rel = relation_open(relid, NoLock); /* Set up pstate and CreateStmtContext */ pstate = make_parsestate(NULL); @@ -2483,7 +2461,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) IndexStmt *idxstmt = (IndexStmt *) lfirst(l); Assert(IsA(idxstmt, IndexStmt)); - idxstmt = transformIndexStmt(idxstmt, queryString); + idxstmt = transformIndexStmt(relid, idxstmt, queryString); newcmd = makeNode(AlterTableCmd); newcmd->subtype = OidIsValid(idxstmt->indexOid) ? AT_AddIndexConstraint : AT_AddIndex; newcmd->def = (Node *) idxstmt; @@ -2507,7 +2485,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) newcmds = lappend(newcmds, newcmd); } - /* Close rel but keep lock */ + /* Close rel */ relation_close(rel, NoLock); /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index f4d25bd1edd..d1621add7d8 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -78,49 +78,6 @@ static void ProcessUtilitySlow(Node *parsetree, static void ExecDropStmt(DropStmt *stmt, bool isTopLevel); -/* - * Verify user has ownership of specified relation, else ereport. - * - * If noCatalogs is true then we also deny access to system catalogs, - * except when allowSystemTableMods is true. - */ -void -CheckRelationOwnership(RangeVar *rel, bool noCatalogs) -{ - Oid relOid; - HeapTuple tuple; - - /* - * XXX: This is unsafe in the presence of concurrent DDL, since it is - * called before acquiring any lock on the target relation. However, - * locking the target relation (especially using something like - * AccessExclusiveLock) before verifying that the user has permissions is - * not appealing either. - */ - relOid = RangeVarGetRelid(rel, NoLock, false); - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); - if (!HeapTupleIsValid(tuple)) /* should not happen */ - elog(ERROR, "cache lookup failed for relation %u", relOid); - - if (!pg_class_ownercheck(relOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - rel->relname); - - if (noCatalogs) - { - if (!allowSystemTableMods && - IsSystemClass(relOid, (Form_pg_class) GETSTRUCT(tuple))) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - rel->relname))); - } - - ReleaseSysCache(tuple); -} - - /* * CommandIsReadOnly: is an executable query read-only? * @@ -1019,7 +976,8 @@ ProcessUtilitySlow(Node *parsetree, if (OidIsValid(relid)) { /* Run parse analysis ... */ - stmts = transformAlterTableStmt(atstmt, queryString); + stmts = transformAlterTableStmt(relid, atstmt, + queryString); /* ... and do it */ foreach(l, stmts) @@ -1160,18 +1118,36 @@ ProcessUtilitySlow(Node *parsetree, case T_IndexStmt: /* CREATE INDEX */ { IndexStmt *stmt = (IndexStmt *) parsetree; + Oid relid; + LOCKMODE lockmode; if (stmt->concurrent) PreventTransactionChain(isTopLevel, "CREATE INDEX CONCURRENTLY"); - CheckRelationOwnership(stmt->relation, true); + /* + * Look up the relation OID just once, right here at the + * beginning, so that we don't end up repeating the name + * lookup later and latching onto a different relation + * partway through. To avoid lock upgrade hazards, it's + * important that we take the strongest lock that will + * eventually be needed here, so the lockmode calculation + * needs to match what DefineIndex() does. + */ + lockmode = stmt->concurrent ? ShareUpdateExclusiveLock + : ShareLock; + relid = + RangeVarGetRelidExtended(stmt->relation, lockmode, + false, false, + RangeVarCallbackOwnsRelation, + NULL); /* Run parse analysis ... */ - stmt = transformIndexStmt(stmt, queryString); + stmt = transformIndexStmt(relid, stmt, queryString); /* ... and do it */ - DefineIndex(stmt, + DefineIndex(relid, /* OID of heap relation */ + stmt, InvalidOid, /* no predefined OID */ false, /* is_alter_table */ true, /* check_rights */ @@ -1276,7 +1252,8 @@ ProcessUtilitySlow(Node *parsetree, case T_CreateTrigStmt: (void) CreateTrigger((CreateTrigStmt *) parsetree, queryString, - InvalidOid, InvalidOid, false); + InvalidOid, InvalidOid, InvalidOid, + InvalidOid, false); break; case T_CreatePLangStmt: diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index a6b2a6deff0..1f01637b2e7 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -247,6 +247,7 @@ extern char *ChooseConstraintName(const char *name1, const char *name2, extern void AlterConstraintNamespaces(Oid ownerId, Oid oldNspId, Oid newNspId, bool isType, ObjectAddresses *objsMoved); +extern void get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid); extern Oid get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok); extern Oid get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok); diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 0b1b2b7b78c..5ec9374aad0 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -21,7 +21,8 @@ extern void RemoveObjects(DropStmt *stmt); /* commands/indexcmds.c */ -extern Oid DefineIndex(IndexStmt *stmt, +extern Oid DefineIndex(Oid relationId, + IndexStmt *stmt, Oid indexRelationId, bool is_alter_table, bool check_rights, @@ -36,7 +37,6 @@ extern char *makeObjectName(const char *name1, const char *name2, extern char *ChooseRelationName(const char *name1, const char *name2, const char *label, Oid namespaceid); extern bool CheckIndexCompatible(Oid oldId, - RangeVar *heapRelation, char *accessMethodName, List *attributeList, List *exclusionOpNames); diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index f90fca541e2..5c0518ce13b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -78,4 +78,6 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit, extern void RangeVarCallbackOwnsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); +extern void RangeVarCallbackOwnsRelation(const RangeVar *relation, + Oid relId, Oid oldRelId, void *noCatalogs); #endif /* TABLECMDS_H */ diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 44d686c6838..18cb128ed4d 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -109,7 +109,7 @@ extern PGDLLIMPORT int SessionReplicationRole; #define TRIGGER_DISABLED 'D' extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString, - Oid constraintOid, Oid indexOid, + Oid relOid, Oid refRelOid, Oid constraintOid, Oid indexOid, bool isInternal); extern void RemoveTriggerById(Oid trigOid); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index f649ad4aa07..b5011af3e7f 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1652,6 +1652,7 @@ typedef struct Constraint char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */ List *old_conpfeqop; /* pg_constraint.conpfeqop of my former self */ + Oid old_pktable_oid; /* pg_constraint.confrelid of my former self */ /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /* skip validation of existing rows? */ diff --git a/src/include/parser/parse_utilcmd.h b/src/include/parser/parse_utilcmd.h index 3454727fbe3..ffbfbf0c30e 100644 --- a/src/include/parser/parse_utilcmd.h +++ b/src/include/parser/parse_utilcmd.h @@ -18,9 +18,10 @@ extern List *transformCreateStmt(CreateStmt *stmt, const char *queryString); -extern List *transformAlterTableStmt(AlterTableStmt *stmt, +extern List *transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, const char *queryString); -extern IndexStmt *transformIndexStmt(IndexStmt *stmt, const char *queryString); +extern IndexStmt *transformIndexStmt(Oid relid, IndexStmt *stmt, + const char *queryString); extern void transformRuleStmt(RuleStmt *stmt, const char *queryString, List **actions, Node **whereClause); extern List *transformCreateSchemaStmt(CreateSchemaStmt *stmt); diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h index a449bd0a533..3ecf931bead 100644 --- a/src/include/tcop/utility.h +++ b/src/include/tcop/utility.h @@ -49,6 +49,4 @@ extern LogStmtLevel GetCommandLogLevel(Node *parsetree); extern bool CommandIsReadOnly(Node *parsetree); -extern void CheckRelationOwnership(RangeVar *rel, bool noCatalogs); - #endif /* UTILITY_H */