1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-19 15:49:24 +03:00

Respect permissions within logical replication.

Prevent logical replication workers from performing insert, update,
delete, truncate, or copy commands on tables unless the subscription
owner has permission to do so.

Prevent subscription owners from circumventing row-level security by
forbidding replication into tables with row-level security policies
which the subscription owner is subject to, without regard to whether
the policy would ordinarily allow the INSERT, UPDATE, DELETE or
TRUNCATE which is being replicated.  This seems sufficient for now, as
superusers, roles with bypassrls, and target table owners should still
be able to replicate despite RLS policies.  We can revisit the
question of applying row-level security policies on a per-row basis if
this restriction proves too severe in practice.

Author: Mark Dilger
Reviewed-by: Jeff Davis, Andrew Dunstan, Ronan Dunklau
Discussion: https://postgr.es/m/9DFC88D3-1300-4DE8-ACBC-4CEF84399A53%40enterprisedb.com
This commit is contained in:
Jeff Davis
2022-01-07 17:38:20 -08:00
parent d0d62262d3
commit a2ab9c06ea
6 changed files with 499 additions and 8 deletions

View File

@@ -179,6 +179,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
@@ -189,6 +190,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
@@ -1530,6 +1532,38 @@ GetRelationIdentityOrPK(Relation rel)
return idxoid;
}
/*
* Check that we (the subscription owner) have sufficient privileges on the
* target relation to perform the given operation.
*/
static void
TargetPrivilegesCheck(Relation rel, AclMode mode)
{
Oid relid;
AclResult aclresult;
relid = RelationGetRelid(rel);
aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
get_relkind_objtype(rel->rd_rel->relkind),
get_rel_name(relid));
/*
* We lack the infrastructure to honor RLS policies. It might be possible
* to add such infrastructure here, but tablesync workers lack it, too, so
* we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
* but it seems dangerous to replicate a TRUNCATE and then refuse to
* replicate subsequent INSERTs, so we forbid all commands the same.
*/
if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
GetUserNameFromId(GetUserId(), true),
RelationGetRelationName(rel))));
}
/*
* Handle INSERT message.
*/
@@ -1613,6 +1647,7 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
ExecOpenIndices(relinfo, false);
/* Do the insert. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
ExecSimpleRelationInsert(relinfo, estate, remoteslot);
/* Cleanup. */
@@ -1796,6 +1831,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, remoteslot);
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
remoteslot);
}
@@ -1917,6 +1953,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, localslot);
/* Do the actual delete. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
}
else
@@ -2110,6 +2147,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
ExecOpenIndices(partrelinfo, false);
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
localslot, remoteslot_part);
ExecCloseIndices(partrelinfo);
@@ -2236,6 +2275,7 @@ apply_handle_truncate(StringInfo s)
}
remote_rels = lappend(remote_rels, rel);
TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
rels = lappend(rels, rel->localrel);
relids = lappend_oid(relids, rel->localreloid);
if (RelationIsLogicallyLogged(rel->localrel))
@@ -2273,6 +2313,7 @@ apply_handle_truncate(StringInfo s)
continue;
}
TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
rels = lappend(rels, childrel);
part_rels = lappend(part_rels, childrel);
relids = lappend_oid(relids, childrelid);
@@ -2915,6 +2956,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,