1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-22 12:22:45 +03:00

Add large object access control.

A new system catalog pg_largeobject_metadata manages
ownership and access privileges of large objects.

KaiGai Kohei, reviewed by Jaime Casanova.
This commit is contained in:
Itagaki Takahiro
2009-12-11 03:34:57 +00:00
parent 64579962bb
commit f1325ce213
39 changed files with 1450 additions and 173 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $
* $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,8 +16,16 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_largeobject.h"
#include "catalog/pg_largeobject_metadata.h"
#include "catalog/toasting.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/bytea.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -27,113 +35,258 @@
/*
* Create a large object having the given LO identifier.
*
* We do this by inserting an empty first page, so that the object will
* appear to exist with size 0. Note that the unique index will reject
* an attempt to create a duplicate page.
* We create a new large object by inserting an entry into
* pg_largeobject_metadata without any data pages, so that the object
* will appear to exist with size 0.
*/
void
Oid
LargeObjectCreate(Oid loid)
{
Relation pg_largeobject;
Relation pg_lo_meta;
HeapTuple ntup;
Datum values[Natts_pg_largeobject];
bool nulls[Natts_pg_largeobject];
int i;
Oid loid_new;
Datum values[Natts_pg_largeobject_metadata];
bool nulls[Natts_pg_largeobject_metadata];
pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
RowExclusiveLock);
/*
* Form new tuple
* Insert metadata of the largeobject
*/
for (i = 0; i < Natts_pg_largeobject; i++)
{
values[i] = (Datum) NULL;
nulls[i] = false;
}
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
i = 0;
values[i++] = ObjectIdGetDatum(loid);
values[i++] = Int32GetDatum(0);
values[i++] = DirectFunctionCall1(byteain,
CStringGetDatum(""));
values[Anum_pg_largeobject_metadata_lomowner - 1]
= ObjectIdGetDatum(GetUserId());
nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true;
ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls);
ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta),
values, nulls);
if (OidIsValid(loid))
HeapTupleSetOid(ntup, loid);
/*
* Insert it
*/
simple_heap_insert(pg_largeobject, ntup);
loid_new = simple_heap_insert(pg_lo_meta, ntup);
Assert(!OidIsValid(loid) || loid == loid_new);
/* Update indexes */
CatalogUpdateIndexes(pg_largeobject, ntup);
heap_close(pg_largeobject, RowExclusiveLock);
CatalogUpdateIndexes(pg_lo_meta, ntup);
heap_freetuple(ntup);
heap_close(pg_lo_meta, RowExclusiveLock);
return loid_new;
}
/*
* Drop a large object having the given LO identifier.
*
* When we drop a large object, it is necessary to drop both of metadata
* and data pages in same time.
*/
void
LargeObjectDrop(Oid loid)
{
bool found = false;
Relation pg_lo_meta;
Relation pg_largeobject;
ScanKeyData skey[1];
SysScanDesc sd;
SysScanDesc scan;
HeapTuple tuple;
pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
RowExclusiveLock);
pg_largeobject = heap_open(LargeObjectRelationId,
RowExclusiveLock);
/*
* Delete an entry from pg_largeobject_metadata
*/
ScanKeyInit(&skey[0],
Anum_pg_largeobject_loid,
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
ObjectIdGetDatum(loid));
pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
scan = systable_beginscan(pg_lo_meta,
LargeObjectMetadataOidIndexId, true,
SnapshotNow, 1, skey);
sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
SnapshotNow, 1, skey);
while ((tuple = systable_getnext(sd)) != NULL)
{
simple_heap_delete(pg_largeobject, &tuple->t_self);
found = true;
}
systable_endscan(sd);
heap_close(pg_largeobject, RowExclusiveLock);
if (!found)
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", loid)));
}
bool
LargeObjectExists(Oid loid)
{
bool retval = false;
Relation pg_largeobject;
ScanKeyData skey[1];
SysScanDesc sd;
simple_heap_delete(pg_lo_meta, &tuple->t_self);
systable_endscan(scan);
/*
* See if we can find any tuples belonging to the specified LO
* Delete all the associated entries from pg_largeobject
*/
ScanKeyInit(&skey[0],
Anum_pg_largeobject_loid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
scan = systable_beginscan(pg_largeobject,
LargeObjectLOidPNIndexId, true,
SnapshotNow, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
simple_heap_delete(pg_largeobject, &tuple->t_self);
}
sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
systable_endscan(scan);
heap_close(pg_largeobject, RowExclusiveLock);
heap_close(pg_lo_meta, RowExclusiveLock);
}
/*
* LargeObjectAlterOwner
*
* Implementation of ALTER LARGE OBJECT statement
*/
void
LargeObjectAlterOwner(Oid loid, Oid newOwnerId)
{
Form_pg_largeobject_metadata form_lo_meta;
Relation pg_lo_meta;
ScanKeyData skey[1];
SysScanDesc scan;
HeapTuple oldtup;
HeapTuple newtup;
pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
RowExclusiveLock);
ScanKeyInit(&skey[0],
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
scan = systable_beginscan(pg_lo_meta,
LargeObjectMetadataOidIndexId, true,
SnapshotNow, 1, skey);
oldtup = systable_getnext(scan);
if (!HeapTupleIsValid(oldtup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", loid)));
form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup);
if (form_lo_meta->lomowner != newOwnerId)
{
Datum values[Natts_pg_largeobject_metadata];
bool nulls[Natts_pg_largeobject_metadata];
bool replaces[Natts_pg_largeobject_metadata];
Acl *newAcl;
Datum aclDatum;
bool isnull;
/* Superusers can always do it */
if (!superuser())
{
/*
* The 'lo_compat_privileges' is not checked here, because we
* don't have any access control features in the 8.4.x series
* or earlier release.
* So, it is not a place we can define a compatible behavior.
*/
/* Otherwise, must be owner of the existing object */
if (!pg_largeobject_ownercheck(loid, GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of large object %u", loid)));
/* Must be able to become new owner */
check_is_member_of_role(GetUserId(), newOwnerId);
}
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(nulls));
values[Anum_pg_largeobject_metadata_lomowner - 1]
= ObjectIdGetDatum(newOwnerId);
replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true;
/*
* Determine the modified ACL for the new owner.
* This is only necessary when the ACL is non-null.
*/
aclDatum = heap_getattr(oldtup,
Anum_pg_largeobject_metadata_lomacl,
RelationGetDescr(pg_lo_meta), &isnull);
if (!isnull)
{
newAcl = aclnewowner(DatumGetAclP(aclDatum),
form_lo_meta->lomowner, newOwnerId);
values[Anum_pg_largeobject_metadata_lomacl - 1]
= PointerGetDatum(newAcl);
replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
}
newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta),
values, nulls, replaces);
simple_heap_update(pg_lo_meta, &newtup->t_self, newtup);
CatalogUpdateIndexes(pg_lo_meta, newtup);
heap_freetuple(newtup);
/* Update owner dependency reference */
changeDependencyOnOwner(LargeObjectRelationId,
loid, newOwnerId);
}
systable_endscan(scan);
heap_close(pg_lo_meta, RowExclusiveLock);
}
/*
* LargeObjectExists
*
* Currently, we don't use system cache to contain metadata of
* large objects, because massive number of large objects can
* consume not a small amount of process local memory.
*
* Note that LargeObjectExists always scans the system catalog
* with SnapshotNow, so it is unavailable to use to check
* existence in read-only accesses.
*/
bool
LargeObjectExists(Oid loid)
{
Relation pg_lo_meta;
ScanKeyData skey[1];
SysScanDesc sd;
HeapTuple tuple;
bool retval = false;
ScanKeyInit(&skey[0],
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
AccessShareLock);
sd = systable_beginscan(pg_lo_meta,
LargeObjectMetadataOidIndexId, true,
SnapshotNow, 1, skey);
if (systable_getnext(sd) != NULL)
tuple = systable_getnext(sd);
if (HeapTupleIsValid(tuple))
retval = true;
systable_endscan(sd);
heap_close(pg_largeobject, AccessShareLock);
heap_close(pg_lo_meta, AccessShareLock);
return retval;
}