1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Fix MAINTAIN privileges for toast tables and partitions.

Commit 60684dd8 left loose ends when it came to maintaining toast
tables or partitions.

For toast tables, simply skip the privilege check if the toast table
is an indirect target of the maintenance command, because the main
table privileges have already been checked.

For partitions, allow the maintenance command if the user has the
MAINTAIN privilege on the partition or any parent.

Also make CLUSTER emit "skipping" messages when the user doesn't have
privileges, similar to VACUUM.

Author: Nathan Bossart
Reported-by: Pavel Luzanov
Reviewed-by: Pavel Luzanov, Ted Yu
Discussion: https://postgr.es/m/20230113231339.GA2422750@nathanxps13
This commit is contained in:
Jeff Davis
2023-01-13 15:32:37 -08:00
parent ff23b592ad
commit ff9618e82a
16 changed files with 126 additions and 64 deletions

View File

@@ -156,7 +156,10 @@ ANALYZE [ VERBOSE ] [ <replaceable class="parameter">table_and_columns</replacea
analyze all tables in their databases, except shared catalogs.
(The restriction for shared catalogs means that a true database-wide
<command>ANALYZE</command> can only be performed by superusers and roles
with privileges of <literal>pg_maintain</literal>.)
with privileges of <literal>pg_maintain</literal>.) If a role has
permission to <command>ANALYZE</command> a partitioned table, it is also
permitted to <command>ANALYZE</command> each of its partitions, regardless
of whether the role has the aforementioned privileges on the partition.
<command>ANALYZE</command> will skip over any tables that the calling user
does not have permission to analyze.
</para>

View File

@@ -69,10 +69,7 @@ CLUSTER [VERBOSE]
<para>
<command>CLUSTER</command> without any parameter reclusters all the
previously-clustered tables in the current database that the calling user
owns or has the <literal>MAINTAIN</literal> privilege for, or all such tables
if called by a superuser or a role with privileges of the
<link linkend="predefined-roles-table"><literal>pg_maintain</literal></link>
role. This form of <command>CLUSTER</command> cannot be
has privileges for. This form of <command>CLUSTER</command> cannot be
executed inside a transaction block.
</para>
@@ -134,6 +131,18 @@ CLUSTER [VERBOSE]
<refsect1>
<title>Notes</title>
<para>
To cluster a table, one must have the <literal>MAINTAIN</literal> privilege
on the table or be the table's owner, a superuser, or a role with
privileges of the
<link linkend="predefined-roles-table"><literal>pg_maintain</literal></link>
role. If a role has permission to <command>CLUSTER</command> a partitioned
table, it is also permitted to <command>CLUSTER</command> each of its
partitions, regardless of whether the role has the aforementioned
privileges on the partition. <command>CLUSTER</command> will skip over any
tables that the calling user does not have permission to cluster.
</para>
<para>
In cases where you are accessing single rows randomly
within a table, the actual order of the data in the

View File

@@ -177,7 +177,10 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
MODE</literal> (or a less-conflicting mode as described in <xref
linkend="explicit-locking"/>) is permitted. If a user has
<literal>SELECT</literal> privileges on the table, <literal>ACCESS SHARE
MODE</literal> is permitted.
MODE</literal> is permitted. If a role has permission to lock a
partitioned table, it is also permitted to lock each of its partitions,
regardless of whether the role has the aforementioned privileges on the
partition.
</para>
<para>

View File

@@ -306,7 +306,11 @@ REINDEX [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] { DA
indexes on shared catalogs will be skipped unless the user owns the
catalog (which typically won't be the case), has privileges of the
<literal>pg_maintain</literal> role, or has the <literal>MAINTAIN</literal>
privilege on the catalog. Of course, superusers can always reindex anything.
privilege on the catalog. If a role has permission to
<command>REINDEX</command> a partitioned table, it is also permitted to
<command>REINDEX</command> each of its partitions, regardless of whether the
role has the aforementioned privileges on the partition. Of course,
superusers can always reindex anything.
</para>
<para>

View File

@@ -401,7 +401,10 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
vacuum all tables in their databases, except shared catalogs.
(The restriction for shared catalogs means that a true database-wide
<command>VACUUM</command> can only be performed by superusers and roles
with privileges of <literal>pg_maintain</literal>.)
with privileges of <literal>pg_maintain</literal>.) If a role has
permission to <command>VACUUM</command> a partitioned table, it is also
permitted to <command>VACUUM</command> each of its partitions, regardless
of whether the role has the aforementioned privileges on the partition.
<command>VACUUM</command> will skip over any tables that the calling user
does not have permission to vacuum.
</para>

View File

@@ -80,6 +80,7 @@ static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
static List *get_tables_to_cluster(MemoryContext cluster_context);
static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
Oid indexOid);
static bool cluster_is_permitted_for_relation(Oid relid, Oid userid);
/*---------------------------------------------------------------------------
@@ -366,8 +367,7 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
if (recheck)
{
/* Check that the user still has privileges for the relation */
if (!object_ownercheck(RelationRelationId, tableOid, save_userid) &&
pg_class_aclcheck(tableOid, save_userid, ACL_MAINTAIN) != ACLCHECK_OK)
if (!cluster_is_permitted_for_relation(tableOid, save_userid))
{
relation_close(OldHeap, AccessExclusiveLock);
goto out;
@@ -1645,8 +1645,7 @@ get_tables_to_cluster(MemoryContext cluster_context)
index = (Form_pg_index) GETSTRUCT(indexTuple);
if (!object_ownercheck(RelationRelationId, index->indrelid, GetUserId()) &&
pg_class_aclcheck(index->indrelid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
if (!cluster_is_permitted_for_relation(index->indrelid, GetUserId()))
continue;
/* Use a permanent memory context for the result list */
@@ -1694,12 +1693,11 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
if (get_rel_relkind(indexrelid) != RELKIND_INDEX)
continue;
/* Silently skip partitions which the user has no access to. */
if (!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
(!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) ||
IsSharedRelation(relid)))
continue;
/*
* We already checked that the user has privileges to CLUSTER the
* partitioned table when we locked it earlier, so there's no need to
* check the privileges again here.
*/
/* Use a permanent memory context for the result list */
old_context = MemoryContextSwitchTo(cluster_context);
@@ -1714,3 +1712,20 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
return rtcs;
}
/*
* Return whether userid has privileges to CLUSTER relid. If not, this
* function emits a WARNING.
*/
static bool
cluster_is_permitted_for_relation(Oid relid, Oid userid)
{
if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK ||
has_partition_ancestor_privs(relid, userid, ACL_MAINTAIN))
return true;
ereport(WARNING,
(errmsg("permission denied to cluster \"%s\", skipping it",
get_rel_name(relid))));
return false;
}

View File

@@ -2796,9 +2796,9 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation,
/* Check permissions */
table_oid = IndexGetRelation(relId, true);
if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
OidIsValid(table_oid) &&
pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
if (OidIsValid(table_oid) &&
pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
!has_partition_ancestor_privs(table_oid, GetUserId(), ACL_MAINTAIN))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
relation->relname);
@@ -3008,16 +3008,16 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
/*
* The table can be reindexed if the user has been granted MAINTAIN on
* the table or the user is a superuser, the table owner, or the
* database/schema owner (but in the latter case, only if it's not a
* shared relation). object_ownercheck includes the superuser case,
* and depending on objectKind we already know that the user has
* permission to run REINDEX on this database or schema per the
* permission checks at the beginning of this routine.
* the table or one of its partition ancestors or the user is a
* superuser, the table owner, or the database/schema owner (but in the
* latter case, only if it's not a shared relation). pg_class_aclcheck
* includes the superuser case, and depending on objectKind we already
* know that the user has permission to run REINDEX on this database or
* schema per the permission checks at the beginning of this routine.
*/
if (classtuple->relisshared &&
!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
!has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
continue;
/*

View File

@@ -19,6 +19,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_inherits.h"
#include "commands/lockcmds.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_clause.h"
@@ -305,5 +306,12 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
aclresult = pg_class_aclcheck(reloid, userid, aclmask);
/*
* If this is a partition, check permissions of its ancestors if needed.
*/
if (aclresult != ACLCHECK_OK &&
has_partition_ancestor_privs(reloid, userid, ACL_MAINTAIN))
aclresult = ACLCHECK_OK;
return aclresult;
}

View File

@@ -16886,12 +16886,38 @@ RangeVarCallbackMaintainsTable(const RangeVar *relation,
errmsg("\"%s\" is not a table or materialized view", relation->relname)));
/* Check permissions */
if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
if (pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
!has_partition_ancestor_privs(relId, GetUserId(), ACL_MAINTAIN))
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLE,
relation->relname);
}
/*
* If relid is a partition, returns whether userid has any of the privileges
* specified in acl on any of its ancestors. Otherwise, returns false.
*/
bool
has_partition_ancestor_privs(Oid relid, Oid userid, AclMode acl)
{
List *ancestors;
ListCell *lc;
if (!get_rel_relispartition(relid))
return false;
ancestors = get_partition_ancestors(relid);
foreach(lc, ancestors)
{
Oid ancestor = lfirst_oid(lc);
if (OidIsValid(ancestor) &&
pg_class_aclcheck(ancestor, userid, acl) == ACLCHECK_OK)
return true;
}
return false;
}
/*
* Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
*/

View File

@@ -41,6 +41,7 @@
#include "catalog/pg_namespace.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@@ -91,7 +92,8 @@ static void vac_truncate_clog(TransactionId frozenXID,
MultiXactId minMulti,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
bool skip_privs);
static double compute_parallel_delay(void);
static VacOptValue get_vacoptval_from_boolean(DefElem *def);
static bool vac_tid_reaped(ItemPointer itemptr, void *state);
@@ -501,7 +503,7 @@ vacuum(List *relations, VacuumParams *params,
if (params->options & VACOPT_VACUUM)
{
if (!vacuum_rel(vrel->oid, vrel->relation, params))
if (!vacuum_rel(vrel->oid, vrel->relation, params, false))
continue;
}
@@ -598,11 +600,13 @@ vacuum_is_permitted_for_relation(Oid relid, Form_pg_class reltuple,
* - the role owns the relation
* - the role owns the current database and the relation is not shared
* - the role has been granted the MAINTAIN privilege on the relation
* - the role has privileges to vacuum/analyze any of the relation's
* partition ancestors
*----------
*/
if (object_ownercheck(RelationRelationId, relid, GetUserId()) ||
(object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK)
if ((object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK ||
has_partition_ancestor_privs(relid, GetUserId(), ACL_MAINTAIN))
return true;
relname = NameStr(reltuple->relname);
@@ -1828,7 +1832,7 @@ vac_truncate_clog(TransactionId frozenXID,
* At entry and exit, we are not inside a transaction.
*/
static bool
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, bool skip_privs)
{
LOCKMODE lmode;
Relation rel;
@@ -1915,7 +1919,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
* happen across multiple transactions where privileges could have changed
* in-between. Make sure to only generate logs for VACUUM in this case.
*/
if (!vacuum_is_permitted_for_relation(RelationGetRelid(rel),
if (!skip_privs &&
!vacuum_is_permitted_for_relation(RelationGetRelid(rel),
rel->rd_rel,
params->options & VACOPT_VACUUM))
{
@@ -2089,7 +2094,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
* totally unimportant for toast relations.
*/
if (toast_relid != InvalidOid)
vacuum_rel(toast_relid, NULL, params);
vacuum_rel(toast_relid, NULL, params, true);
/*
* Now release the session-level lock on the main table.

View File

@@ -98,6 +98,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit,
extern void RangeVarCallbackMaintainsTable(const RangeVar *relation,
Oid relId, Oid oldRelId,
void *arg);
extern bool has_partition_ancestor_privs(Oid relid, Oid userid, AclMode acl);
extern void RangeVarCallbackOwnsRelation(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);

View File

@@ -22,14 +22,16 @@ starting permutation: s1_begin s1_lock_child s2_auth s2_cluster s1_commit s2_res
step s1_begin: BEGIN;
step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
step s2_auth: SET ROLE regress_cluster_part;
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
step s1_commit: COMMIT;
step s2_cluster: <... completed>
step s2_reset: RESET ROLE;
starting permutation: s1_begin s2_auth s1_lock_child s2_cluster s1_commit s2_reset
step s1_begin: BEGIN;
step s2_auth: SET ROLE regress_cluster_part;
step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
step s1_commit: COMMIT;
step s2_cluster: <... completed>
step s2_reset: RESET ROLE;

View File

@@ -27,11 +27,8 @@ step s2_auth { SET ROLE regress_cluster_part; }
step s2_cluster { CLUSTER cluster_part_tab USING cluster_part_ind; }
step s2_reset { RESET ROLE; }
# CLUSTER on the parent waits if locked, passes for all cases.
# CLUSTER waits if locked, passes for all cases.
permutation s1_begin s1_lock_parent s2_auth s2_cluster s1_commit s2_reset
permutation s1_begin s2_auth s1_lock_parent s2_cluster s1_commit s2_reset
# When taking a lock on a partition leaf, CLUSTER on the parent skips
# the leaf, passes for all cases.
permutation s1_begin s1_lock_child s2_auth s2_cluster s1_commit s2_reset
permutation s1_begin s2_auth s1_lock_child s2_cluster s1_commit s2_reset

View File

@@ -352,7 +352,9 @@ INSERT INTO clstr_3 VALUES (1);
-- this user can only cluster clstr_1 and clstr_3, but the latter
-- has not been clustered
SET SESSION AUTHORIZATION regress_clstr_user;
SET client_min_messages = ERROR; -- order of "skipping" warnings may vary
CLUSTER;
RESET client_min_messages;
SELECT * FROM clstr_1 UNION ALL
SELECT * FROM clstr_2 UNION ALL
SELECT * FROM clstr_3;
@@ -513,7 +515,7 @@ SELECT a.relname, a.relfilenode=b.relfilenode FROM pg_class a
-----------+----------
ptnowner | t
ptnowner1 | f
ptnowner2 | t
ptnowner2 | f
(3 rows)
DROP TABLE ptnowner;

View File

@@ -353,20 +353,14 @@ ALTER TABLE vacowned_parted OWNER TO regress_vacuum;
ALTER TABLE vacowned_part1 OWNER TO regress_vacuum;
SET ROLE regress_vacuum;
VACUUM vacowned_parted;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
VACUUM vacowned_part1;
VACUUM vacowned_part2;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
ANALYZE vacowned_parted;
WARNING: permission denied to analyze "vacowned_part2", skipping it
ANALYZE vacowned_part1;
ANALYZE vacowned_part2;
WARNING: permission denied to analyze "vacowned_part2", skipping it
VACUUM (ANALYZE) vacowned_parted;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
VACUUM (ANALYZE) vacowned_part1;
VACUUM (ANALYZE) vacowned_part2;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
RESET ROLE;
-- Only one partition owned by other user.
ALTER TABLE vacowned_parted OWNER TO CURRENT_USER;
@@ -395,26 +389,14 @@ ALTER TABLE vacowned_parted OWNER TO regress_vacuum;
ALTER TABLE vacowned_part1 OWNER TO CURRENT_USER;
SET ROLE regress_vacuum;
VACUUM vacowned_parted;
WARNING: permission denied to vacuum "vacowned_part1", skipping it
WARNING: permission denied to vacuum "vacowned_part2", skipping it
VACUUM vacowned_part1;
WARNING: permission denied to vacuum "vacowned_part1", skipping it
VACUUM vacowned_part2;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
ANALYZE vacowned_parted;
WARNING: permission denied to analyze "vacowned_part1", skipping it
WARNING: permission denied to analyze "vacowned_part2", skipping it
ANALYZE vacowned_part1;
WARNING: permission denied to analyze "vacowned_part1", skipping it
ANALYZE vacowned_part2;
WARNING: permission denied to analyze "vacowned_part2", skipping it
VACUUM (ANALYZE) vacowned_parted;
WARNING: permission denied to vacuum "vacowned_part1", skipping it
WARNING: permission denied to vacuum "vacowned_part2", skipping it
VACUUM (ANALYZE) vacowned_part1;
WARNING: permission denied to vacuum "vacowned_part1", skipping it
VACUUM (ANALYZE) vacowned_part2;
WARNING: permission denied to vacuum "vacowned_part2", skipping it
RESET ROLE;
DROP TABLE vacowned;
DROP TABLE vacowned_parted;

View File

@@ -145,7 +145,9 @@ INSERT INTO clstr_3 VALUES (1);
-- this user can only cluster clstr_1 and clstr_3, but the latter
-- has not been clustered
SET SESSION AUTHORIZATION regress_clstr_user;
SET client_min_messages = ERROR; -- order of "skipping" warnings may vary
CLUSTER;
RESET client_min_messages;
SELECT * FROM clstr_1 UNION ALL
SELECT * FROM clstr_2 UNION ALL
SELECT * FROM clstr_3;