diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 3e148f03d0e..edbdce81f21 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -196,27 +196,17 @@ analyze_rel(Oid relid, RangeVar *relation, int options, } /* - * Check permissions --- this should match vacuum's check! + * Check if relation needs to be skipped based on ownership. This check + * happens also when building the relation list to analyze for a manual + * operation, and needs to be done additionally here as ANALYZE could + * happen across multiple transactions where relation ownership could have + * changed in-between. Make sure to generate only logs for ANALYZE in + * this case. */ - if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) || - (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared))) + if (!vacuum_is_relation_owner(RelationGetRelid(onerel), + onerel->rd_rel, + options & VACOPT_ANALYZE)) { - /* No need for a WARNING if we already complained during VACUUM */ - if (!(options & VACOPT_VACUUM)) - { - if (onerel->rd_rel->relisshared) - ereport(WARNING, - (errmsg("skipping \"%s\" --- only superuser can analyze it", - RelationGetRelationName(onerel)))); - else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) - ereport(WARNING, - (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it", - RelationGetRelationName(onerel)))); - else - ereport(WARNING, - (errmsg("skipping \"%s\" --- only table or database owner can analyze it", - RelationGetRelationName(onerel)))); - } relation_close(onerel, ShareUpdateExclusiveLock); return; } diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ee32fe88711..f1665097346 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -68,8 +68,8 @@ static BufferAccessStrategy vac_strategy; /* non-export function prototypes */ -static List *expand_vacuum_rel(VacuumRelation *vrel); -static List *get_all_vacuum_rels(void); +static List *expand_vacuum_rel(VacuumRelation *vrel, int options); +static List *get_all_vacuum_rels(int options); static void vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti, TransactionId lastSaneFrozenXid, @@ -257,7 +257,7 @@ vacuum(int options, List *relations, VacuumParams *params, List *sublist; MemoryContext old_context; - sublist = expand_vacuum_rel(vrel); + sublist = expand_vacuum_rel(vrel, options); old_context = MemoryContextSwitchTo(vac_context); newrels = list_concat(newrels, sublist); MemoryContextSwitchTo(old_context); @@ -265,7 +265,7 @@ vacuum(int options, List *relations, VacuumParams *params, relations = newrels; } else - relations = get_all_vacuum_rels(); + relations = get_all_vacuum_rels(options); /* * Decide whether we need to start/commit our own transactions. @@ -408,6 +408,80 @@ vacuum(int options, List *relations, VacuumParams *params, vac_context = NULL; } +/* + * Check if a given relation can be safely vacuumed or analyzed. If the + * user is not the relation owner, issue a WARNING log message and return + * false to let the caller decide what to do with this relation. This + * routine is used to decide if a relation can be processed for VACUUM or + * ANALYZE. + */ +bool +vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, int options) +{ + char *relname; + + Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0); + + /* + * Check permissions. + * + * We allow the user to vacuum or analyze a table if he is superuser, the + * table owner, or the database owner (but in the latter case, only if + * it's not a shared relation). pg_class_ownercheck includes the + * superuser case. + * + * Note we choose to treat permissions failure as a WARNING and keep + * trying to vacuum or analyze the rest of the DB --- is this appropriate? + */ + if (pg_class_ownercheck(relid, GetUserId()) || + (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared)) + return true; + + relname = NameStr(reltuple->relname); + + if ((options & VACOPT_VACUUM) != 0) + { + if (reltuple->relisshared) + ereport(WARNING, + (errmsg("skipping \"%s\" --- only superuser can vacuum it", + relname))); + else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE) + ereport(WARNING, + (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it", + relname))); + else + ereport(WARNING, + (errmsg("skipping \"%s\" --- only table or database owner can vacuum it", + relname))); + + /* + * For VACUUM ANALYZE, both logs could show up, but just generate + * information for VACUUM as that would be the first one to be + * processed. + */ + return false; + } + + if ((options & VACOPT_ANALYZE) != 0) + { + if (reltuple->relisshared) + ereport(WARNING, + (errmsg("skipping \"%s\" --- only superuser can analyze it", + relname))); + else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE) + ereport(WARNING, + (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it", + relname))); + else + ereport(WARNING, + (errmsg("skipping \"%s\" --- only table or database owner can analyze it", + relname))); + } + + return false; +} + + /* * Given a VacuumRelation, fill in the table OID if it wasn't specified, * and optionally add VacuumRelations for partitions of the table. @@ -423,7 +497,7 @@ vacuum(int options, List *relations, VacuumParams *params, * are made in vac_context. */ static List * -expand_vacuum_rel(VacuumRelation *vrel) +expand_vacuum_rel(VacuumRelation *vrel, int options) { List *vacrels = NIL; MemoryContext oldcontext; @@ -457,22 +531,28 @@ expand_vacuum_rel(VacuumRelation *vrel) relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false); /* - * Make a returnable VacuumRelation for this rel. - */ - oldcontext = MemoryContextSwitchTo(vac_context); - vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation, - relid, - vrel->va_cols)); - MemoryContextSwitchTo(oldcontext); - - /* - * To check whether the relation is a partitioned table, fetch its - * syscache entry. + * To check whether the relation is a partitioned table and its + * ownership, fetch its syscache entry. */ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for relation %u", relid); classForm = (Form_pg_class) GETSTRUCT(tuple); + + /* + * Make a returnable VacuumRelation for this rel if user is a proper + * owner. + */ + if (vacuum_is_relation_owner(relid, classForm, options)) + { + oldcontext = MemoryContextSwitchTo(vac_context); + vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation, + relid, + vrel->va_cols)); + MemoryContextSwitchTo(oldcontext); + } + + include_parts = (classForm->relkind == RELKIND_PARTITIONED_TABLE); ReleaseSysCache(tuple); @@ -481,7 +561,9 @@ expand_vacuum_rel(VacuumRelation *vrel) * the list returned by find_all_inheritors() includes the passed-in * OID, so we have to skip that. There's no point in taking locks on * the individual partitions yet, and doing so would just add - * unnecessary deadlock risk. + * unnecessary deadlock risk. For this last reason we do not check + * yet the ownership of the partitions, which get added to the list to + * process. Ownership will be checked later on anyway. */ if (include_parts) { @@ -530,7 +612,7 @@ expand_vacuum_rel(VacuumRelation *vrel) * the current database. The list is built in vac_context. */ static List * -get_all_vacuum_rels(void) +get_all_vacuum_rels(int options) { List *vacrels = NIL; Relation pgclass; @@ -545,6 +627,11 @@ get_all_vacuum_rels(void) { Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple); MemoryContext oldcontext; + Oid relid = HeapTupleGetOid(tuple); + + /* check permissions of relation */ + if (!vacuum_is_relation_owner(relid, classForm, options)) + continue; /* * We include partitioned tables here; depending on which operation is @@ -563,7 +650,7 @@ get_all_vacuum_rels(void) */ oldcontext = MemoryContextSwitchTo(vac_context); vacrels = lappend(vacrels, makeVacuumRelation(NULL, - HeapTupleGetOid(tuple), + relid, NIL)); MemoryContextSwitchTo(oldcontext); } @@ -1436,30 +1523,17 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) } /* - * Check permissions. - * - * We allow the user to vacuum a table if he is superuser, the table - * owner, or the database owner (but in the latter case, only if it's not - * a shared relation). pg_class_ownercheck includes the superuser case. - * - * Note we choose to treat permissions failure as a WARNING and keep - * trying to vacuum the rest of the DB --- is this appropriate? + * Check if relation needs to be skipped based on ownership. This check + * happens also when building the relation list to vacuum for a manual + * operation, and needs to be done additionally here as VACUUM could + * happen across multiple transactions where relation ownership could have + * changed in-between. Make sure to only generate logs for VACUUM in this + * case. */ - if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) || - (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared))) + if (!vacuum_is_relation_owner(RelationGetRelid(onerel), + onerel->rd_rel, + options & VACOPT_VACUUM)) { - if (onerel->rd_rel->relisshared) - ereport(WARNING, - (errmsg("skipping \"%s\" --- only superuser can vacuum it", - RelationGetRelationName(onerel)))); - else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) - ereport(WARNING, - (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it", - RelationGetRelationName(onerel)))); - else - ereport(WARNING, - (errmsg("skipping \"%s\" --- only table or database owner can vacuum it", - RelationGetRelationName(onerel)))); relation_close(onerel, lmode); PopActiveSnapshot(); CommitTransactionCommand(); diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 85d472f0a54..5af96fdc8a2 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -15,6 +15,7 @@ #define VACUUM_H #include "access/htup.h" +#include "catalog/pg_class.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" #include "nodes/parsenodes.h" @@ -185,6 +186,8 @@ extern void vacuum_set_xid_limits(Relation rel, MultiXactId *mxactFullScanLimit); extern void vac_update_datfrozenxid(void); extern void vacuum_delay_point(void); +extern bool vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, + int options); /* in commands/vacuumlazy.c */ extern void lazy_vacuum_rel(Relation onerel, int options, diff --git a/src/test/isolation/expected/vacuum-conflict.out b/src/test/isolation/expected/vacuum-conflict.out new file mode 100644 index 00000000000..06ac75ef23b --- /dev/null +++ b/src/test/isolation/expected/vacuum-conflict.out @@ -0,0 +1,149 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_begin s1_lock s2_auth s2_vacuum s1_commit s2_reset +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can vacuum it +step s2_vacuum: VACUUM vacuum_tab; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_auth s2_vacuum s1_lock s1_commit s2_reset +step s1_begin: BEGIN; +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can vacuum it +step s2_vacuum: VACUUM vacuum_tab; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_auth s1_lock s2_vacuum s1_commit s2_reset +step s1_begin: BEGIN; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +WARNING: skipping "vacuum_tab" --- only table or database owner can vacuum it +step s2_vacuum: VACUUM vacuum_tab; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s2_auth s2_vacuum s1_begin s1_lock s1_commit s2_reset +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can vacuum it +step s2_vacuum: VACUUM vacuum_tab; +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s1_lock s2_auth s2_analyze s1_commit s2_reset +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can analyze it +step s2_analyze: ANALYZE vacuum_tab; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_auth s2_analyze s1_lock s1_commit s2_reset +step s1_begin: BEGIN; +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can analyze it +step s2_analyze: ANALYZE vacuum_tab; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_auth s1_lock s2_analyze s1_commit s2_reset +step s1_begin: BEGIN; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +WARNING: skipping "vacuum_tab" --- only table or database owner can analyze it +step s2_analyze: ANALYZE vacuum_tab; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s2_auth s2_analyze s1_begin s1_lock s1_commit s2_reset +step s2_auth: SET ROLE regress_vacuum_conflict; +WARNING: skipping "vacuum_tab" --- only table or database owner can analyze it +step s2_analyze: ANALYZE vacuum_tab; +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s1_lock s2_auth s2_vacuum s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_vacuum: VACUUM vacuum_tab; +step s1_commit: COMMIT; +step s2_vacuum: <... completed> +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s2_auth s2_vacuum s1_lock s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_vacuum: VACUUM vacuum_tab; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s2_auth s1_lock s2_vacuum s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_vacuum: VACUUM vacuum_tab; +step s1_commit: COMMIT; +step s2_vacuum: <... completed> +step s2_reset: RESET ROLE; + +starting permutation: s2_grant s2_auth s2_vacuum s1_begin s1_lock s1_commit s2_reset +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_vacuum: VACUUM vacuum_tab; +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s1_lock s2_auth s2_analyze s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_analyze: ANALYZE vacuum_tab; +step s1_commit: COMMIT; +step s2_analyze: <... completed> +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s2_auth s2_analyze s1_lock s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_analyze: ANALYZE vacuum_tab; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; + +starting permutation: s1_begin s2_grant s2_auth s1_lock s2_analyze s1_commit s2_reset +step s1_begin: BEGIN; +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s2_analyze: ANALYZE vacuum_tab; +step s1_commit: COMMIT; +step s2_analyze: <... completed> +step s2_reset: RESET ROLE; + +starting permutation: s2_grant s2_auth s2_analyze s1_begin s1_lock s1_commit s2_reset +step s2_grant: ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; +step s2_auth: SET ROLE regress_vacuum_conflict; +step s2_analyze: ANALYZE vacuum_tab; +step s1_begin: BEGIN; +step s1_lock: LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; +step s1_commit: COMMIT; +step s2_reset: RESET ROLE; diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule index 48ae7407399..c23b401225d 100644 --- a/src/test/isolation/isolation_schedule +++ b/src/test/isolation/isolation_schedule @@ -66,6 +66,7 @@ test: async-notify test: vacuum-reltuples test: timeouts test: vacuum-concurrent-drop +test: vacuum-conflict test: predicate-hash test: predicate-gist test: predicate-gin diff --git a/src/test/isolation/specs/vacuum-conflict.spec b/src/test/isolation/specs/vacuum-conflict.spec new file mode 100644 index 00000000000..9b45d26c65b --- /dev/null +++ b/src/test/isolation/specs/vacuum-conflict.spec @@ -0,0 +1,51 @@ +# Tests for locking conflicts with VACUUM and ANALYZE commands. + +setup +{ + CREATE ROLE regress_vacuum_conflict; + CREATE TABLE vacuum_tab (a int); +} + +teardown +{ + DROP TABLE vacuum_tab; + DROP ROLE regress_vacuum_conflict; +} + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_lock" { LOCK vacuum_tab IN SHARE UPDATE EXCLUSIVE MODE; } +step "s1_commit" { COMMIT; } + +session "s2" +step "s2_grant" { ALTER TABLE vacuum_tab OWNER TO regress_vacuum_conflict; } +step "s2_auth" { SET ROLE regress_vacuum_conflict; } +step "s2_vacuum" { VACUUM vacuum_tab; } +step "s2_analyze" { ANALYZE vacuum_tab; } +step "s2_reset" { RESET ROLE; } + +# The role doesn't have privileges to vacuum the table, so VACUUM should +# immediately skip the table without waiting for a lock. +permutation "s1_begin" "s1_lock" "s2_auth" "s2_vacuum" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_auth" "s2_vacuum" "s1_lock" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_auth" "s1_lock" "s2_vacuum" "s1_commit" "s2_reset" +permutation "s2_auth" "s2_vacuum" "s1_begin" "s1_lock" "s1_commit" "s2_reset" + +# Same as previously for ANALYZE +permutation "s1_begin" "s1_lock" "s2_auth" "s2_analyze" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_auth" "s2_analyze" "s1_lock" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_auth" "s1_lock" "s2_analyze" "s1_commit" "s2_reset" +permutation "s2_auth" "s2_analyze" "s1_begin" "s1_lock" "s1_commit" "s2_reset" + +# The role has privileges to vacuum the table, VACUUM will block if +# another session holds a lock on the table and succeed in all cases. +permutation "s1_begin" "s2_grant" "s1_lock" "s2_auth" "s2_vacuum" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_grant" "s2_auth" "s2_vacuum" "s1_lock" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_grant" "s2_auth" "s1_lock" "s2_vacuum" "s1_commit" "s2_reset" +permutation "s2_grant" "s2_auth" "s2_vacuum" "s1_begin" "s1_lock" "s1_commit" "s2_reset" + +# Same as previously for ANALYZE +permutation "s1_begin" "s2_grant" "s1_lock" "s2_auth" "s2_analyze" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_grant" "s2_auth" "s2_analyze" "s1_lock" "s1_commit" "s2_reset" +permutation "s1_begin" "s2_grant" "s2_auth" "s1_lock" "s2_analyze" "s1_commit" "s2_reset" +permutation "s2_grant" "s2_auth" "s2_analyze" "s1_begin" "s1_lock" "s1_commit" "s2_reset"