1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-07 19:06:32 +03:00

Adjust lo_open() so that specifying INV_READ without INV_WRITE creates

a descriptor that uses the current transaction snapshot, rather than
SnapshotNow as it did before (and still does if INV_WRITE is set).
This means pg_dump will now dump a consistent snapshot of large object
contents, as it never could do before.  Also, add a lo_create() function
that is similar to lo_creat() but allows the desired OID of the large
object to be specified.  This will simplify pg_restore considerably
(but I'll fix that in a separate commit).
This commit is contained in:
Tom Lane
2005-06-13 02:26:53 +00:00
parent f52a34229b
commit a2fb7b8a1f
11 changed files with 265 additions and 96 deletions

View File

@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.110 2005/04/14 20:03:25 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.111 2005/06/13 02:26:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -114,6 +114,42 @@ close_lo_relation(bool isCommit)
}
/*
* Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
* read with can be specified.
*/
static bool
myLargeObjectExists(Oid loid, Snapshot snapshot)
{
bool retval = false;
Relation pg_largeobject;
ScanKeyData skey[1];
SysScanDesc sd;
/*
* See if we can find any tuples belonging to the specified LO
*/
ScanKeyInit(&skey[0],
Anum_pg_largeobject_loid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
snapshot, 1, skey);
if (systable_getnext(sd) != NULL)
retval = true;
systable_endscan(sd);
heap_close(pg_largeobject, AccessShareLock);
return retval;
}
static int32
getbytealen(bytea *data)
{
@@ -125,58 +161,44 @@ getbytealen(bytea *data)
/*
* inv_create -- create a new large object.
* inv_create -- create a new large object
*
* Arguments:
* flags
* Arguments:
* lobjId - OID to use for new large object, or InvalidOid to pick one
*
* Returns:
* large object descriptor, appropriately filled in.
* Returns:
* OID of new object
*
* If lobjId is not InvalidOid, then an error occurs if the OID is already
* in use.
*/
LargeObjectDesc *
inv_create(int flags)
Oid
inv_create(Oid lobjId)
{
Oid file_oid;
LargeObjectDesc *retval;
/*
* Allocate an OID to be the LO's identifier.
* Allocate an OID to be the LO's identifier, unless we were told
* what to use. In event of collision with an existing ID, loop
* to find a free one.
*/
file_oid = newoid();
/* Check for duplicate (shouldn't happen) */
if (LargeObjectExists(file_oid))
elog(ERROR, "large object %u already exists", file_oid);
if (!OidIsValid(lobjId))
{
do {
lobjId = newoid();
} while (LargeObjectExists(lobjId));
}
/*
* Create the LO by writing an empty first page for it in
* pg_largeobject
* pg_largeobject (will fail if duplicate)
*/
LargeObjectCreate(file_oid);
LargeObjectCreate(lobjId);
/*
* Advance command counter so that new tuple will be seen by later
* large-object operations in this transaction.
* Advance command counter to make new tuple visible to later operations.
*/
CommandCounterIncrement();
/*
* Prepare LargeObjectDesc data structure for accessing LO
*/
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
retval->id = file_oid;
retval->subid = GetCurrentSubTransactionId();
retval->offset = 0;
if (flags & INV_WRITE)
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
else if (flags & INV_READ)
retval->flags = IFS_RDLOCK;
else
elog(ERROR, "invalid flags: %d", flags);
return retval;
return lobjId;
}
/*
@@ -190,11 +212,6 @@ inv_open(Oid lobjId, int flags)
{
LargeObjectDesc *retval;
if (!LargeObjectExists(lobjId))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", lobjId)));
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
retval->id = lobjId;
@@ -202,12 +219,25 @@ inv_open(Oid lobjId, int flags)
retval->offset = 0;
if (flags & INV_WRITE)
{
retval->snapshot = SnapshotNow;
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
}
else if (flags & INV_READ)
{
/* be sure to copy snap into fscxt */
retval->snapshot = CopySnapshot(ActiveSnapshot);
retval->flags = IFS_RDLOCK;
}
else
elog(ERROR, "invalid flags: %d", flags);
/* Can't use LargeObjectExists here because it always uses SnapshotNow */
if (!myLargeObjectExists(lobjId, retval->snapshot))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", lobjId)));
return retval;
}
@@ -218,6 +248,8 @@ void
inv_close(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
if (obj_desc->snapshot != SnapshotNow)
FreeSnapshot(obj_desc->snapshot);
pfree(obj_desc);
}
@@ -268,7 +300,7 @@ inv_getsize(LargeObjectDesc *obj_desc)
ObjectIdGetDatum(obj_desc->id));
sd = index_beginscan(lo_heap_r, lo_index_r,
SnapshotNow, 1, skey);
obj_desc->snapshot, 1, skey);
/*
* Because the pg_largeobject index is on both loid and pageno, but we
@@ -379,7 +411,7 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Int32GetDatum(pageno));
sd = index_beginscan(lo_heap_r, lo_index_r,
SnapshotNow, 2, skey);
obj_desc->snapshot, 2, skey);
while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
{
@@ -470,6 +502,13 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
/* enforce writability because snapshot is probably wrong otherwise */
if ((obj_desc->flags & IFS_WRLOCK) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("large object %u was not opened for writing",
obj_desc->id)));
if (nbytes <= 0)
return 0;
@@ -488,7 +527,7 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Int32GetDatum(pageno));
sd = index_beginscan(lo_heap_r, lo_index_r,
SnapshotNow, 2, skey);
obj_desc->snapshot, 2, skey);
oldtuple = NULL;
olddata = NULL;