diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 1ac858642db..6a5beee94d7 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@ # # Makefile for catalog # -# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.29 2000/10/21 15:55:21 momjian Exp $ +# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.30 2000/10/22 05:27:10 momjian Exp $ # #------------------------------------------------------------------------- @@ -11,8 +11,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = catalog.o heap.o index.o indexing.o aclchk.o \ - pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \ - pg_type.o + pg_aggregate.o pg_operator.o pg_proc.o pg_type.o BKIFILES = global.bki template1.bki global.description template1.description @@ -30,7 +29,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_proc.h pg_type.h pg_attribute.h pg_class.h \ pg_inherits.h pg_index.h pg_statistic.h \ pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ - pg_language.h pg_largeobject.h \ + pg_language.h \ pg_aggregate.h pg_ipl.h pg_inheritproc.h \ pg_rewrite.h pg_listener.h pg_description.h indexing.h \ ) diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c index faa3dc6a421..342896a93b2 100644 --- a/src/backend/catalog/indexing.c +++ b/src/backend/catalog/indexing.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.70 2000/10/21 15:55:21 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.71 2000/10/22 05:27:10 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -51,8 +51,6 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] = {InheritsRelidSeqnoIndex}; char *Name_pg_language_indices[Num_pg_language_indices] = {LanguageOidIndex, LanguageNameIndex}; -char *Name_pg_largeobject_indices[Num_pg_largeobject_indices] = -{LargeobjectLOIdIndex, LargeobjectLOIdPNIndex}; char *Name_pg_listener_indices[Num_pg_listener_indices] = {ListenerPidRelnameIndex}; char *Name_pg_opclass_indices[Num_pg_opclass_indices] = diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c deleted file mode 100644 index ace63d32634..00000000000 --- a/src/backend/catalog/pg_largeobject.c +++ /dev/null @@ -1,135 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pg_largeobject.c - * routines to support manipulation of the pg_largeobject relation - * - * Portions Copyright (c) 1996-2000, PostgreSQL, Inc - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.3 2000/10/21 15:55:21 momjian Exp $ - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "access/genam.h" -#include "access/heapam.h" -#include "catalog/catname.h" -#include "catalog/indexing.h" -#include "catalog/pg_largeobject.h" -#include "miscadmin.h" -#include "utils/fmgroids.h" - -bytea *_byteain(const char *data, int32 size); - -bytea *_byteain(const char *data, int32 size) { - bytea *result; - - result = (bytea *)palloc(size + VARHDRSZ); - result->vl_len = size + VARHDRSZ; - if (size > 0) - memcpy(result->vl_dat, data, size); - - return result; -} - -Oid LargeobjectCreate(Oid loid) { - Oid retval; - Relation pg_largeobject; - HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData)); - Relation idescs[Num_pg_largeobject_indices]; - Datum values[Natts_pg_largeobject]; - char nulls[Natts_pg_largeobject]; - int i; - - for (i=0; ird_att, values, nulls); - retval = heap_insert(pg_largeobject, ntup); - - if (!IsIgnoringSystemIndexes()) { - CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs); - CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup); - CatalogCloseIndices(Num_pg_largeobject_indices, idescs); - } - - heap_close(pg_largeobject, RowExclusiveLock); - heap_freetuple(ntup); - - CommandCounterIncrement(); - - return retval; -} - -void LargeobjectDrop(Oid loid) { - Relation pg_largeobject; - Relation pg_lo_id; - ScanKeyData skey; - IndexScanDesc sd = (IndexScanDesc) NULL; - RetrieveIndexResult indexRes; - int found = 0; - - ScanKeyEntryInitialize(&skey, - (bits16) 0x0, - (AttrNumber) 1, - (RegProcedure) F_OIDEQ, - ObjectIdGetDatum(loid)); - - pg_largeobject = heap_openr(LargeobjectRelationName, RowShareLock); - pg_lo_id = index_openr(LargeobjectLOIdIndex); - - sd = index_beginscan(pg_lo_id, false, 1, &skey); - - while((indexRes = index_getnext(sd, ForwardScanDirection))) { - found++; - heap_delete(pg_largeobject, &indexRes->heap_iptr, NULL); - pfree(indexRes); - } - - index_endscan(sd); - - index_close(pg_lo_id); - heap_close(pg_largeobject, RowShareLock); - if (found == 0) - elog(ERROR, "LargeobjectDrop: large object %d not found", loid); -} - -int LargeobjectFind(Oid loid) { - int retval = 0; - Relation pg_lo_id; - ScanKeyData skey; - IndexScanDesc sd = (IndexScanDesc) NULL; - RetrieveIndexResult indexRes; - - ScanKeyEntryInitialize(&skey, - (bits16) 0x0, - (AttrNumber) 1, - (RegProcedure) F_OIDEQ, - ObjectIdGetDatum(loid)); - - pg_lo_id = index_openr(LargeobjectLOIdIndex); - - sd = index_beginscan(pg_lo_id, false, 1, &skey); - - if ((indexRes = index_getnext(sd, ForwardScanDirection))) { - retval = 1; - pfree(indexRes); - } - - index_endscan(sd); - - index_close(pg_lo_id); - return retval; -} - diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index 30b50012714..bb5c7f6e556 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.53 2000/10/21 15:55:22 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.54 2000/10/22 05:27:12 momjian Exp $ * * NOTES * This should be moved to a more appropriate place. It is here @@ -267,7 +267,7 @@ lo_creat(PG_FUNCTION_ARGS) PG_RETURN_OID(InvalidOid); } - lobjId = lobjDesc->id; + lobjId = RelationGetRelid(lobjDesc->heap_r); inv_close(lobjDesc); @@ -512,10 +512,8 @@ lo_commit(bool isCommit) { if (cookies[i] != NULL) { -/* if (isCommit) inv_cleanindex(cookies[i]); -*/ cookies[i] = NULL; } } diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index a245c818631..5b7df0562ad 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.77 2000/10/21 15:55:24 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.78 2000/10/22 05:27:15 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -22,34 +22,58 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/nbtree.h" -#include "access/htup.h" #include "catalog/catalog.h" -#include "catalog/catname.h" #include "catalog/heap.h" #include "catalog/index.h" -#include "catalog/indexing.h" #include "catalog/pg_opclass.h" -#include "catalog/pg_largeobject.h" #include "catalog/pg_type.h" #include "libpq/libpq-fs.h" #include "miscadmin.h" #include "storage/large_object.h" #include "storage/smgr.h" #include "utils/fmgroids.h" -#include "utils/builtins.h" +#include "utils/relcache.h" -#include +/* + * Warning, Will Robinson... In order to pack data into an inversion + * file as densely as possible, we violate the class abstraction here. + * When we're appending a new tuple to the end of the table, we check + * the last page to see how much data we can put on it. If it's more + * than IMINBLK, we write enough to fill the page. This limits external + * fragmentation. In no case can we write more than IMAXBLK, since + * the 8K postgres page size less overhead leaves only this much space + * for data. + */ -#define IBLKSIZE (MaxTupleSize - MinHeapTupleBitmapSize - sizeof(int32) * 3) +/* + * In order to prevent buffer leak on transaction commit, large object + * scan index handling has been modified. Indexes are persistant inside + * a transaction but may be closed between two calls to this API (when + * transaction is committed while object is opened, or when no + * transaction is active). Scan indexes are thus now reinitialized using + * the object current offset. [PA] + * + * Some cleanup has been also done for non freed memory. + * + * For subsequent notes, [PA] is Pascal André + */ -/* Defined in backend/storage/catalog/large_object.c */ -bytea *_byteain(const char *data, int32 size); +#define IFREESPC(p) (PageGetFreeSpace(p) - \ + MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \ + MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \ + sizeof(double)) +#define IMAXBLK 8092 +#define IMINBLK 512 -static int32 getbytealen(bytea *data) { - if (VARSIZE(data) < VARHDRSZ) - elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error."); - return (VARSIZE(data) - VARHDRSZ); -} +/* non-export function prototypes */ +static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer, + Page page, char *dbuf, int nwrite); +static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer); +static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes); +static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes, + HeapTuple tuple, Buffer buffer); +static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple); +static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln); /* * inv_create -- create a new large object. @@ -60,13 +84,19 @@ static int32 getbytealen(bytea *data) { * Returns: * large object descriptor, appropriately filled in. */ - LargeObjectDesc * inv_create(int flags) { - int file_oid; LargeObjectDesc *retval; - + Oid file_oid; + Relation r; + Relation indr; + TupleDesc tupdesc; + IndexInfo *indexInfo; + Oid classObjectId[1]; + char objname[NAMEDATALEN]; + char indname[NAMEDATALEN]; + /* * add one here since the pg_class tuple created will have the next * oid and we want to have the relation name to correspond to the @@ -74,25 +104,104 @@ inv_create(int flags) */ file_oid = newoid() + 1; - if (LargeobjectFind(file_oid) == 1) - elog(ERROR, "inv_create: large object %d already exists. This is internal error.", file_oid); + /* come up with some table names */ + sprintf(objname, "xinv%u", file_oid); + sprintf(indname, "xinx%u", file_oid); + + if (RelnameFindRelid(objname) != InvalidOid) + elog(ERROR, + "internal error: %s already exists -- cannot create large obj", + objname); + if (RelnameFindRelid(indname) != InvalidOid) + elog(ERROR, + "internal error: %s already exists -- cannot create large obj", + indname); + + /* this is pretty painful... want a tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, + "olastbye", + INT4OID, + -1, 0, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, + "odata", + BYTEAOID, + -1, 0, false); + + /* + * First create the table to hold the inversion large object. It will + * be located on whatever storage manager the user requested. + */ + + heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT, + false, false); + + /* make the relation visible in this transaction */ + CommandCounterIncrement(); + + /*-------------------- + * We hold AccessShareLock on any large object we have open + * by inv_create or inv_open; it is released by inv_close. + * Note this will not conflict with ExclusiveLock or ShareLock + * that we acquire when actually reading/writing; it just prevents + * deletion of the large object while we have it open. + *-------------------- + */ + r = heap_openr(objname, AccessShareLock); + + /* + * Now create a btree index on the relation's olastbyte attribute to + * make seeks go faster. + */ + indexInfo = makeNode(IndexInfo); + indexInfo->ii_NumIndexAttrs = 1; + indexInfo->ii_NumKeyAttrs = 1; + indexInfo->ii_KeyAttrNumbers[0] = 1; + indexInfo->ii_Predicate = NULL; + indexInfo->ii_FuncOid = InvalidOid; + indexInfo->ii_Unique = false; + + classObjectId[0] = INT4_OPS_OID; + + index_create(objname, indname, indexInfo, + BTREE_AM_OID, classObjectId, + false, false, false); + + /* make the index visible in this transaction */ + CommandCounterIncrement(); + + indr = index_openr(indname); + + if (!RelationIsValid(indr)) + { + elog(ERROR, "cannot create index for large obj on %s under inversion", + DatumGetCString(DirectFunctionCall1(smgrout, + Int16GetDatum(DEFAULT_SMGR)))); + } retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); - if (flags & INV_WRITE) { - retval->flags = IFS_WRLOCK | IFS_RDLOCK; - retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock); - } else if (flags & INV_READ) { - retval->flags = IFS_RDLOCK; - retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock); - } else - elog(ERROR, "inv_create: invalid flags: %d", flags); + retval->heap_r = r; + retval->index_r = indr; + retval->iscan = (IndexScanDesc) NULL; + retval->hdesc = RelationGetDescr(r); + retval->idesc = RelationGetDescr(indr); + retval->offset = retval->lowbyte = retval->highbyte = 0; + ItemPointerSetInvalid(&(retval->htid)); + retval->flags = 0; + + if (flags & INV_WRITE) + { + LockRelation(r, ExclusiveLock); + retval->flags = IFS_WRLOCK | IFS_RDLOCK; + } + else if (flags & INV_READ) + { + LockRelation(r, ShareLock); + retval->flags = IFS_RDLOCK; + } + retval->flags |= IFS_ATEOF; /* since we know the object is empty */ - retval->flags |= IFS_ATEOF; - retval->index_r = index_openr(LargeobjectLOIdPNIndex); - retval->offset = 0; - retval->id = file_oid; - (void)LargeobjectCreate(file_oid); return retval; } @@ -100,24 +209,46 @@ LargeObjectDesc * inv_open(Oid lobjId, int flags) { LargeObjectDesc *retval; + Relation r; + char *indname; + Relation indrel; - if (LargeobjectFind(lobjId) == 0) - elog(ERROR, "inv_open: large object %d not found", lobjId); - - retval = (LargeObjectDesc *)palloc(sizeof(LargeObjectDesc)); + r = heap_open(lobjId, AccessShareLock); - if (flags & INV_WRITE) { + indname = pstrdup(RelationGetRelationName(r)); + + /* + * hack hack hack... we know that the fourth character of the + * relation name is a 'v', and that the fourth character of the index + * name is an 'x', and that they're otherwise identical. + */ + indname[3] = 'x'; + indrel = index_openr(indname); + + if (!RelationIsValid(indrel)) + return (LargeObjectDesc *) NULL; + + retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); + + retval->heap_r = r; + retval->index_r = indrel; + retval->iscan = (IndexScanDesc) NULL; + retval->hdesc = RelationGetDescr(r); + retval->idesc = RelationGetDescr(indrel); + retval->offset = retval->lowbyte = retval->highbyte = 0; + ItemPointerSetInvalid(&(retval->htid)); + retval->flags = 0; + + if (flags & INV_WRITE) + { + LockRelation(r, ExclusiveLock); retval->flags = IFS_WRLOCK | IFS_RDLOCK; - retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock); - } else if (flags & INV_READ) { + } + else if (flags & INV_READ) + { + LockRelation(r, ShareLock); retval->flags = IFS_RDLOCK; - retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock); - } else - elog(ERROR, "inv_open: invalid flags: %d", flags); - - retval->index_r = index_openr(LargeobjectLOIdPNIndex); - retval->offset = 0; - retval->id = lobjId; + } return retval; } @@ -130,11 +261,15 @@ inv_close(LargeObjectDesc *obj_desc) { Assert(PointerIsValid(obj_desc)); - if (obj_desc->flags & IFS_WRLOCK) - heap_close(obj_desc->heap_r, RowExclusiveLock); - else if (obj_desc->flags & IFS_RDLOCK) - heap_close(obj_desc->heap_r, AccessShareLock); + if (obj_desc->iscan != (IndexScanDesc) NULL) + { + index_endscan(obj_desc->iscan); + obj_desc->iscan = NULL; + } + index_close(obj_desc->index_r); + heap_close(obj_desc->heap_r, AccessShareLock); + pfree(obj_desc); } @@ -146,7 +281,24 @@ inv_close(LargeObjectDesc *obj_desc) int inv_drop(Oid lobjId) { - LargeobjectDrop(lobjId); + Relation r; + + r = RelationIdGetRelation(lobjId); + if (!RelationIsValid(r)) + return -1; + + if (r->rd_rel->relkind != RELKIND_LOBJECT) + { + /* drop relcache refcount from RelationIdGetRelation */ + RelationDecrementReferenceCount(r); + return -1; + } + + /* + * Since heap_drop_with_catalog will destroy the relcache entry, + * there's no need to drop the refcount in this path. + */ + heap_drop_with_catalog(RelationGetRelationName(r), false); return 1; } @@ -212,75 +364,71 @@ inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf) #endif -static uint32 inv_getsize(LargeObjectDesc *obj_desc) { - uint32 found = 0; - uint32 lastbyte = 0; - ScanKeyData skey; - IndexScanDesc sd = (IndexScanDesc) NULL; - RetrieveIndexResult indexRes; - HeapTupleData tuple; - Buffer buffer; - Form_pg_largeobject data; - - Assert(PointerIsValid(obj_desc)); - - ScanKeyEntryInitialize(&skey, - (bits16) 0x0, - (AttrNumber) 1, - (RegProcedure) F_OIDEQ, - ObjectIdGetDatum(obj_desc->id)); - - sd = index_beginscan(obj_desc->index_r, true, 1, &skey); - tuple.t_datamcxt = CurrentMemoryContext; - tuple.t_data = NULL; - while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - tuple.t_self = indexRes->heap_iptr; - heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer); - pfree(indexRes); - if (tuple.t_data == NULL) - continue; - found++; - data = (Form_pg_largeobject) GETSTRUCT(&tuple); - lastbyte = data->pageno * IBLKSIZE + getbytealen(&(data->data)); - ReleaseBuffer(buffer); - break; - } - - index_endscan(sd); - - if (found == 0) - elog(ERROR, "inv_getsize: large object %d not found", obj_desc->id); - return lastbyte; -} - int inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) { + int oldOffset; + Datum d; + ScanKeyData skey; + Assert(PointerIsValid(obj_desc)); - switch (whence) { - case SEEK_SET: - if (offset < 0) - elog(ERROR, "inv_seek: invalid offset: %d", offset); - obj_desc->offset = offset; - break; - case SEEK_CUR: - if ((obj_desc->offset + offset) < 0) - elog(ERROR, "inv_seek: invalid offset: %d", offset); - obj_desc->offset += offset; - break; - case SEEK_END: - { - int4 size = inv_getsize(obj_desc); - if (offset > size) - elog(ERROR, "inv_seek: invalid offset"); - obj_desc->offset = size - offset; - } - break; - default: - elog(ERROR, "inv_seek: invalid whence: %d", whence); + if (whence == SEEK_CUR) + { + offset += obj_desc->offset; /* calculate absolute position */ } - return obj_desc->offset; + else if (whence == SEEK_END) + { + /* need read lock for getsize */ + if (!(obj_desc->flags & IFS_RDLOCK)) + { + LockRelation(obj_desc->heap_r, ShareLock); + obj_desc->flags |= IFS_RDLOCK; + } + offset += _inv_getsize(obj_desc->heap_r, + obj_desc->hdesc, + obj_desc->index_r); + } + /* now we can assume that the operation is SEEK_SET */ + + /* + * Whenever we do a seek, we turn off the EOF flag bit to force + * ourselves to check for real on the next read. + */ + + obj_desc->flags &= ~IFS_ATEOF; + oldOffset = obj_desc->offset; + obj_desc->offset = offset; + + /* try to avoid doing any work, if we can manage it */ + if (offset >= obj_desc->lowbyte + && offset <= obj_desc->highbyte + && oldOffset <= obj_desc->highbyte + && obj_desc->iscan != (IndexScanDesc) NULL) + return offset; + + /* + * To do a seek on an inversion file, we start an index scan that will + * bring us to the right place. Each tuple in an inversion file + * stores the offset of the last byte that appears on it, and we have + * an index on this. + */ + if (obj_desc->iscan != (IndexScanDesc) NULL) + { + d = Int32GetDatum(offset); + btmovescan(obj_desc->iscan, d); + } + else + { + ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, + Int32GetDatum(offset)); + + obj_desc->iscan = index_beginscan(obj_desc->index_r, + (bool) 0, (uint16) 1, + &skey); + } + + return offset; } int @@ -294,259 +442,862 @@ inv_tell(LargeObjectDesc *obj_desc) int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - uint32 nread = 0; - uint32 n; - uint32 off; - uint32 len; - uint32 found = 0; - uint32 pageno = obj_desc->offset / IBLKSIZE; - ScanKeyData skey[2]; - IndexScanDesc sd = (IndexScanDesc) NULL; - RetrieveIndexResult indexRes; - HeapTupleData tuple; - Buffer buffer; - Form_pg_largeobject data; + HeapTupleData tuple; + int nread; + int off; + int ncopy; + Datum d; + struct varlena *fsblock; + bool isNull; Assert(PointerIsValid(obj_desc)); Assert(buf != NULL); - ScanKeyEntryInitialize(&skey[0], - (bits16) 0x0, - (AttrNumber) 1, - (RegProcedure) F_OIDEQ, - ObjectIdGetDatum(obj_desc->id)); + /* if we're already at EOF, we don't need to do any work here */ + if (obj_desc->flags & IFS_ATEOF) + return 0; - ScanKeyEntryInitialize(&skey[1], - (bits16) 0x0, - (AttrNumber) 2, - (RegProcedure) F_INT4GE, - Int32GetDatum(pageno)); - - sd = index_beginscan(obj_desc->index_r, false, 2, skey); - tuple.t_datamcxt = CurrentMemoryContext; - tuple.t_data = NULL; - while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - tuple.t_self = indexRes->heap_iptr; - heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer); - pfree(indexRes); - - if (tuple.t_data == NULL) - continue; - - found++; - data = (Form_pg_largeobject) GETSTRUCT(&tuple); - if (data->pageno != pageno) { - ReleaseBuffer(buffer); - index_endscan(sd); - return 0; - } - - len = getbytealen(&(data->data)); - off = obj_desc->offset % IBLKSIZE; - if (off == len) { - ReleaseBuffer(buffer); - break; - } - if (off > len) { - ReleaseBuffer(buffer); - index_endscan(sd); - return 0; - } - n = len - off; - - n = (n < (nbytes - nread)) ? n : (nbytes - nread); - memcpy(buf + nread, VARDATA(&(data->data)) + off, n); - nread += n; - obj_desc->offset += n; - - ReleaseBuffer(buffer); - pageno++; - if (nread == nbytes) - break; + /* make sure we obey two-phase locking */ + if (!(obj_desc->flags & IFS_RDLOCK)) + { + LockRelation(obj_desc->heap_r, ShareLock); + obj_desc->flags |= IFS_RDLOCK; } - index_endscan(sd); + nread = 0; - if (found == 0) - return 0; + /* fetch a block at a time */ + while (nread < nbytes) + { + Buffer buffer; + + /* fetch an inversion file system block */ + inv_fetchtup(obj_desc, &tuple, &buffer); + + if (tuple.t_data == NULL) + { + obj_desc->flags |= IFS_ATEOF; + break; + } + + /* copy the data from this block into the buffer */ + d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull); + fsblock = (struct varlena *) DatumGetPointer(d); + ReleaseBuffer(buffer); + + /* + * If block starts beyond current seek point, then we are looking + * at a "hole" (unwritten area) in the object. Return zeroes for + * the "hole". + */ + if (obj_desc->offset < obj_desc->lowbyte) + { + int nzeroes = obj_desc->lowbyte - obj_desc->offset; + + if (nzeroes > (nbytes - nread)) + nzeroes = (nbytes - nread); + MemSet(buf, 0, nzeroes); + buf += nzeroes; + nread += nzeroes; + obj_desc->offset += nzeroes; + if (nread >= nbytes) + break; + } + + off = obj_desc->offset - obj_desc->lowbyte; + ncopy = obj_desc->highbyte - obj_desc->offset + 1; + if (ncopy > (nbytes - nread)) + ncopy = (nbytes - nread); + memmove(buf, &(fsblock->vl_dat[off]), ncopy); + + /* move pointers past the amount we just read */ + buf += ncopy; + nread += ncopy; + obj_desc->offset += ncopy; + } return nread; } -static int inv_write_existing(LargeObjectDesc *obj_desc, char *buf, int nbytes, int *found) { - uint32 n = 0; - uint32 off; - uint32 len; - int i; - HeapTupleData tuple; - HeapTuple newtup; - Buffer buffer; - Form_pg_largeobject data; - ScanKeyData skey[2]; - IndexScanDesc sd = (IndexScanDesc) NULL; - RetrieveIndexResult indexRes; - Relation idescs[Num_pg_largeobject_indices]; - Datum values[Natts_pg_largeobject]; - char nulls[Natts_pg_largeobject]; - char replace[Natts_pg_largeobject]; +int +inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) +{ + HeapTupleData tuple; + int nwritten; + int tuplen; Assert(PointerIsValid(obj_desc)); Assert(buf != NULL); - ScanKeyEntryInitialize(&skey[0], - (bits16) 0, - (AttrNumber) 1, - (RegProcedure) F_OIDEQ, - ObjectIdGetDatum(obj_desc->id)); + /* + * Make sure we obey two-phase locking. A write lock entitles you to + * read the relation, as well. + */ - ScanKeyEntryInitialize(&skey[1], - (bits16) 0x0, - (AttrNumber) 2, - (RegProcedure) F_INT4EQ, - Int32GetDatum(obj_desc->offset / IBLKSIZE)); + if (!(obj_desc->flags & IFS_WRLOCK)) + { + LockRelation(obj_desc->heap_r, ExclusiveLock); + obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK); + } - CommandCounterIncrement(); - sd = index_beginscan(obj_desc->index_r, false, 2, skey); - tuple.t_datamcxt = CurrentMemoryContext; - tuple.t_data = NULL; - while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - tuple.t_self = indexRes->heap_iptr; - heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer); - pfree(indexRes); - if (tuple.t_data != NULL) + nwritten = 0; + + /* write a block at a time */ + while (nwritten < nbytes) + { + Buffer buffer; + + /* + * Fetch the current inversion file system block. We can skip + * the work if we already know we are at EOF. + */ + + if (obj_desc->flags & IFS_ATEOF) + tuple.t_data = NULL; + else + inv_fetchtup(obj_desc, &tuple, &buffer); + + /* either append or replace a block, as required */ + if (tuple.t_data == NULL) + tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten); + else + { + if (obj_desc->offset > obj_desc->highbyte) + { + tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten); + ReleaseBuffer(buffer); + } + else + tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer); + + /* + * inv_wrold() has already issued WriteBuffer() which has + * decremented local reference counter (LocalRefCount). So we + * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4 + */ + } + + /* move pointers past the amount we just wrote */ + buf += tuplen; + nwritten += tuplen; + obj_desc->offset += tuplen; + } + + /* that's it */ + return nwritten; +} + +/* + * inv_cleanindex + * Clean opened indexes for large objects, and clears current result. + * This is necessary on transaction commit in order to prevent buffer + * leak. + * This function must be called for each opened large object. + * [ PA, 7/17/98 ] + */ +void +inv_cleanindex(LargeObjectDesc *obj_desc) +{ + Assert(PointerIsValid(obj_desc)); + + if (obj_desc->iscan == (IndexScanDesc) NULL) + return; + + index_endscan(obj_desc->iscan); + obj_desc->iscan = (IndexScanDesc) NULL; + + ItemPointerSetInvalid(&(obj_desc->htid)); +} + +/* + * inv_fetchtup -- Fetch an inversion file system block. + * + * This routine finds the file system block containing the offset + * recorded in the obj_desc structure. Later, we need to think about + * the effects of non-functional updates (can you rewrite the same + * block twice in a single transaction?), but for now, we won't bother. + * + * Parameters: + * obj_desc -- the object descriptor. + * bufP -- pointer to a buffer in the buffer cache; caller + * must free this. + * + * Returns: + * A heap tuple containing the desired block, or NULL if no + * such tuple exists. + */ +static void +inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer) +{ + RetrieveIndexResult res; + Datum d; + int firstbyte, + lastbyte; + struct varlena *fsblock; + bool isNull; + + /* + * If we've exhausted the current block, we need to get the next one. + * When we support time travel and non-functional updates, we will + * need to loop over the blocks, rather than just have an 'if', in + * order to find the one we're really interested in. + */ + + if (obj_desc->offset > obj_desc->highbyte + || obj_desc->offset < obj_desc->lowbyte + || !ItemPointerIsValid(&(obj_desc->htid))) + { + ScanKeyData skey; + + ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, + Int32GetDatum(obj_desc->offset)); + + /* initialize scan key if not done */ + if (obj_desc->iscan == (IndexScanDesc) NULL) + { + + /* + * As scan index may be prematurely closed (on commit), we + * must use object current offset (was 0) to reinitialize the + * entry [ PA ]. + */ + obj_desc->iscan = index_beginscan(obj_desc->index_r, + (bool) 0, (uint16) 1, + &skey); + } + else + index_rescan(obj_desc->iscan, false, &skey); + + do + { + res = index_getnext(obj_desc->iscan, ForwardScanDirection); + + if (res == (RetrieveIndexResult) NULL) + { + ItemPointerSetInvalid(&(obj_desc->htid)); + tuple->t_datamcxt = NULL; + tuple->t_data = NULL; + return; + } + + /* + * For time travel, we need to use the actual time qual here, + * rather that NowTimeQual. We currently have no way to pass + * a time qual in. + * + * This is now valid for snapshot !!! And should be fixed in some + * way... - vadim 07/28/98 + * + */ + tuple->t_self = res->heap_iptr; + heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer); + pfree(res); + } while (tuple->t_data == NULL); + + /* remember this tid -- we may need it for later reads/writes */ + ItemPointerCopy(&(tuple->t_self), &obj_desc->htid); + } + else + { + tuple->t_self = obj_desc->htid; + heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer); + if (tuple->t_data == NULL) + elog(ERROR, "inv_fetchtup: heap_fetch failed"); + } + + /* + * By here, we have the heap tuple we're interested in. We cache the + * upper and lower bounds for this block in the object descriptor and + * return the tuple. + */ + + d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull); + lastbyte = (int32) DatumGetInt32(d); + d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull); + fsblock = (struct varlena *) DatumGetPointer(d); + + /* + * order of + and - is important -- these are unsigned quantites near + * 0 + */ + firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len; + + obj_desc->lowbyte = firstbyte; + obj_desc->highbyte = lastbyte; + + return; +} + +/* + * inv_wrnew() -- append a new filesystem block tuple to the inversion + * file. + * + * In response to an inv_write, we append one or more file system + * blocks to the class containing the large object. We violate the + * class abstraction here in order to pack things as densely as we + * are able. We examine the last page in the relation, and write + * just enough to fill it, assuming that it has above a certain + * threshold of space available. If the space available is less than + * the threshold, we allocate a new page by writing a big tuple. + * + * By the time we get here, we know all the parameters passed in + * are valid, and that we hold the appropriate lock on the heap + * relation. + * + * Parameters: + * obj_desc: large object descriptor for which to append block. + * buf: buffer containing data to write. + * nbytes: amount to write + * + * Returns: + * number of bytes actually written to the new tuple. + */ +static int +inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes) +{ + Relation hr; + HeapTuple ntup; + Buffer buffer; + Page page; + int nblocks; + int nwritten; + + hr = obj_desc->heap_r; + + /* + * Get the last block in the relation. If there's no data in the + * relation at all, then we just get a new block. Otherwise, we check + * the last block to see whether it has room to accept some or all of + * the data that the user wants to write. If it doesn't, then we + * allocate a new block. + */ + + nblocks = RelationGetNumberOfBlocks(hr); + + if (nblocks > 0) + { + buffer = ReadBuffer(hr, nblocks - 1); + page = BufferGetPage(buffer); + } + else + { + buffer = ReadBuffer(hr, P_NEW); + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + } + + /* + * If the last page is too small to hold all the data, and it's too + * small to hold IMINBLK, then we allocate a new page. If it will + * hold at least IMINBLK, but less than all the data requested, then + * we write IMINBLK here. The caller is responsible for noticing that + * less than the requested number of bytes were written, and calling + * this routine again. + */ + + nwritten = IFREESPC(page); + if (nwritten < nbytes) + { + if (nwritten < IMINBLK) + { + ReleaseBuffer(buffer); + buffer = ReadBuffer(hr, P_NEW); + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + if (nbytes > IMAXBLK) + nwritten = IMAXBLK; + else + nwritten = nbytes; + } + } + else + nwritten = nbytes; + + /* + * Insert a new file system block tuple, index it, and write it out. + */ + + ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten); + inv_indextup(obj_desc, ntup); + heap_freetuple(ntup); + + /* new tuple is inserted */ + WriteBuffer(buffer); + + return nwritten; +} + +static int +inv_wrold(LargeObjectDesc *obj_desc, + char *dbuf, + int nbytes, + HeapTuple tuple, + Buffer buffer) +{ + Relation hr; + HeapTuple ntup; + Buffer newbuf; + Page page; + Page newpage; + int tupbytes; + Datum d; + struct varlena *fsblock; + int nwritten, + nblocks, + freespc; + bool isNull; + int keep_offset; + RetrieveIndexResult res; + + /* + * Since we're using a no-overwrite storage manager, the way we + * overwrite blocks is to mark the old block invalid and append a new + * block. First mark the old block invalid. This violates the tuple + * abstraction. + */ + + TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax)); + tuple->t_data->t_cmax = GetCurrentCommandId(); + tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); + + /* + * If we're overwriting the entire block, we're lucky. All we need to + * do is to insert a new block. + */ + + if (obj_desc->offset == obj_desc->lowbyte + && obj_desc->lowbyte + nbytes >= obj_desc->highbyte) + { + WriteBuffer(buffer); + return inv_wrnew(obj_desc, dbuf, nbytes); + } + + /* + * By here, we need to overwrite part of the data in the current + * tuple. In order to reduce the degree to which we fragment blocks, + * we guarantee that no block will be broken up due to an overwrite. + * This means that we need to allocate a tuple on a new page, if + * there's not room for the replacement on this one. + */ + + newbuf = buffer; + page = BufferGetPage(buffer); + newpage = BufferGetPage(newbuf); + hr = obj_desc->heap_r; + freespc = IFREESPC(page); + d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull); + fsblock = (struct varlena *) DatumGetPointer(d); + tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len); + + if (freespc < tupbytes) + { + + /* + * First see if there's enough space on the last page of the table + * to put this tuple. + */ + + nblocks = RelationGetNumberOfBlocks(hr); + + if (nblocks > 0) + { + newbuf = ReadBuffer(hr, nblocks - 1); + newpage = BufferGetPage(newbuf); + } + else + { + newbuf = ReadBuffer(hr, P_NEW); + newpage = BufferGetPage(newbuf); + PageInit(newpage, BufferGetPageSize(newbuf), 0); + } + + freespc = IFREESPC(newpage); + + /* + * If there's no room on the last page, allocate a new last page + * for the table, and put it there. + */ + + if (freespc < tupbytes) + { + ReleaseBuffer(newbuf); + newbuf = ReadBuffer(hr, P_NEW); + newpage = BufferGetPage(newbuf); + PageInit(newpage, BufferGetPageSize(newbuf), 0); + } + } + + nwritten = nbytes; + if (nwritten > obj_desc->highbyte - obj_desc->offset + 1) + nwritten = obj_desc->highbyte - obj_desc->offset + 1; + memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte), + dbuf, nwritten); + + /* + * we are rewriting the entire old block, therefore we reset offset to + * the lowbyte of the original block before jumping into + * inv_newtuple() + */ + keep_offset = obj_desc->offset; + obj_desc->offset = obj_desc->lowbyte; + ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock), + tupbytes); + /* after we are done, we restore to the true offset */ + obj_desc->offset = keep_offset; + + /* + * By here, we have a page (newpage) that's guaranteed to have enough + * space on it to put the new tuple. Call inv_newtuple to do the + * work. Passing NULL as a buffer to inv_newtuple() keeps it from + * copying any data into the new tuple. When it returns, the tuple is + * ready to receive data from the old tuple and the user's data + * buffer. + */ +/* + ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes); + dptr = ((char *) ntup) + ntup->t_hoff - + (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) + + sizeof(int4) + + sizeof(fsblock->vl_len); + + if (obj_desc->offset > obj_desc->lowbyte) { + memmove(dptr, + &(fsblock->vl_dat[0]), + obj_desc->offset - obj_desc->lowbyte); + dptr += obj_desc->offset - obj_desc->lowbyte; + } + + + nwritten = nbytes; + if (nwritten > obj_desc->highbyte - obj_desc->offset + 1) + nwritten = obj_desc->highbyte - obj_desc->offset + 1; + + memmove(dptr, dbuf, nwritten); + dptr += nwritten; + + if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) { +*/ +/* + loc = (obj_desc->highbyte - obj_desc->offset) + + nwritten; + sz = obj_desc->highbyte - (obj_desc->lowbyte + loc); + + what's going on here?? - jolly +*/ +/* + sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten); + memmove(&(fsblock->vl_dat[0]), dptr, sz); + } +*/ + + + /* index the new tuple */ + inv_indextup(obj_desc, ntup); + heap_freetuple(ntup); + + /* + * move the scandesc forward so we don't reread the newly inserted + * tuple on the next index scan + */ + res = NULL; + if (obj_desc->iscan) + res = index_getnext(obj_desc->iscan, ForwardScanDirection); + + if (res) + pfree(res); + + /* + * Okay, by here, a tuple for the new block is correctly placed, + * indexed, and filled. Write the changed pages out. + */ + + WriteBuffer(buffer); + if (newbuf != buffer) + WriteBuffer(newbuf); + + /* Tuple id is no longer valid */ + ItemPointerSetInvalid(&(obj_desc->htid)); + + /* done */ + return nwritten; +} + +static HeapTuple +inv_newtuple(LargeObjectDesc *obj_desc, + Buffer buffer, + Page page, + char *dbuf, + int nwrite) +{ + HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData)); + PageHeader ph; + int tupsize; + int hoff; + Offset lower; + Offset upper; + ItemId itemId; + OffsetNumber off; + OffsetNumber limit; + char *attptr; + + /* compute tuple size -- no nulls */ + hoff = offsetof(HeapTupleHeaderData, t_bits); + hoff = MAXALIGN(hoff); + + /* add in olastbyte, varlena.vl_len, varlena.vl_dat */ + tupsize = hoff + (2 * sizeof(int32)) + nwrite; + tupsize = MAXALIGN(tupsize); + + /* + * Allocate the tuple on the page, violating the page abstraction. + * This code was swiped from PageAddItem(). + */ + + ph = (PageHeader) page; + limit = OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + /* look for "recyclable" (unused & deallocated) ItemId */ + for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off)) + { + itemId = &ph->pd_linp[off - 1]; + if ((((*itemId).lp_flags & LP_USED) == 0) && + ((*itemId).lp_len == 0)) break; } - index_endscan(sd); - if (tuple.t_data == NULL) - return 0; - - (*found)++; - data = (Form_pg_largeobject) GETSTRUCT(&tuple); - off = obj_desc->offset % IBLKSIZE; - len = getbytealen(&(data->data)); + if (off > limit) + lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page)); + else if (off == limit) + lower = ph->pd_lower + sizeof(ItemIdData); + else + lower = ph->pd_lower; - if (len > IBLKSIZE) { - ReleaseBuffer(buffer); - elog(FATAL, "Internal error: len > IBLKSIZE"); - } + upper = ph->pd_upper - tupsize; - for (i=0; ipd_linp[off - 1]; + (*itemId).lp_off = upper; + (*itemId).lp_len = tupsize; + (*itemId).lp_flags = LP_USED; + ph->pd_lower = lower; + ph->pd_upper = upper; - i = 0; - { - char b[IBLKSIZE]; - int4 rest = len - off; + ntup->t_datamcxt = NULL; + ntup->t_data = (HeapTupleHeader) ((char *) page + upper); - memset(b, 0, IBLKSIZE); /* Can optimize later */ - if ((off > 0) && (len > 0)) /* We start in the middle of the tuple */ - memcpy(b, VARDATA(&(data->data)), (off > len) ? len : off); - - if ((nbytes <= rest) || (len == IBLKSIZE)) { - /* We will update inside existing tuple size */ - if (nbytes < rest) - n = rest; - else - n = nbytes; - memcpy(b + off, buf, n); - if (n < rest) /* There's a rest of the tuple left */ - memcpy(b + off + n, VARDATA(&(data->data)) + off + n, rest - n); - /* Update data only */ - replace[2] = 'r'; - values[2] = (Datum) _byteain(b, len); - } else { - /* We will extend tuple */ - /* Do we fit into max tuple size */ - if (nbytes <= (IBLKSIZE - off)) - len = off + nbytes; - else - len = IBLKSIZE; - n = len - off; - memcpy(b + off, buf, n); - /* Update data */ - replace[2] = 'r'; - values[2] = (Datum) _byteain(b, len); - } + /* + * Tuple is now allocated on the page. Next, fill in the tuple + * header. This block of code violates the tuple abstraction. + */ - newtup = heap_modifytuple(&tuple, obj_desc->heap_r, - values, nulls, replace); + ntup->t_len = tupsize; + ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off); + ntup->t_data->t_oid = newoid(); + TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin)); + ntup->t_data->t_cmin = GetCurrentCommandId(); + StoreInvalidTransactionId(&(ntup->t_data->t_xmax)); + ntup->t_data->t_cmax = 0; + ntup->t_data->t_infomask = HEAP_XMAX_INVALID; + ntup->t_data->t_natts = 2; + ntup->t_data->t_hoff = hoff; - heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL); - if (!IsIgnoringSystemIndexes()) { - CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs); - CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, newtup); - CatalogCloseIndices(Num_pg_largeobject_indices, idescs); + /* if a NULL is passed in, avoid the calculations below */ + if (dbuf == NULL) + return ntup; + + /* + * Finally, copy the user's data buffer into the tuple. This violates + * the tuple and class abstractions. + */ + + attptr = ((char *) ntup->t_data) + hoff; + *((int32 *) attptr) = obj_desc->offset + nwrite - 1; + attptr += sizeof(int32); + + /* + * * mer fixed disk layout of varlenas to get rid of the need for + * this. * + * + * ((int32 *) attptr) = nwrite + sizeof(int32); * attptr += + * sizeof(int32); + */ + + *((int32 *) attptr) = nwrite + sizeof(int32); + attptr += sizeof(int32); + + /* + * If a data buffer was passed in, then copy the data from the buffer + * to the tuple. Some callers (eg, inv_wrold()) may not pass in a + * buffer, since they have to copy part of the old tuple data and part + * of the user's new data into the new tuple. + */ + + if (dbuf != (char *) NULL) + memmove(attptr, dbuf, nwrite); + + /* keep track of boundary of current tuple */ + obj_desc->lowbyte = obj_desc->offset; + obj_desc->highbyte = obj_desc->offset + nwrite - 1; + + /* new tuple is filled -- return it */ + return ntup; +} + +static void +inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple) +{ + InsertIndexResult res; + Datum v[1]; + char n[1]; + + n[0] = ' '; + v[0] = Int32GetDatum(obj_desc->highbyte); + res = index_insert(obj_desc->index_r, &v[0], &n[0], + &(tuple->t_self), obj_desc->heap_r); + + if (res) + pfree(res); +} + +#ifdef NOT_USED + +static void +DumpPage(Page page, int blkno) +{ + ItemId lp; + HeapTuple tup; + int flags, i, nline; + ItemPointerData pointerData; + + printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0, + ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper, + ((PageHeader)page)->pd_special); + + printf("\t:MaxOffsetNumber=%d\n", + (int16) PageGetMaxOffsetNumber(page)); + + nline = (int16) PageGetMaxOffsetNumber(page); + +{ + int i; + char *cp; + + i = PageGetSpecialSize(page); + cp = PageGetSpecialPointer(page); + + printf("\t:SpecialData="); + + while (i > 0) { + printf(" 0x%02x", *cp); + cp += 1; + i -= 1; } - heap_freetuple(newtup); - } + printf("\n"); +} + for (i = 0; i < nline; i++) { + lp = ((PageHeader)page)->pd_linp + i; + flags = (*lp).lp_flags; + ItemPointerSet(&pointerData, blkno, 1 + i); + printf("%s:off=%d:flags=0x%x:len=%d", + ItemPointerFormExternal(&pointerData), (*lp).lp_off, + flags, (*lp).lp_len); + + if (flags & LP_USED) { + HeapTupleData htdata; + + printf(":USED"); + + memmove((char *) &htdata, + (char *) &((char *)page)[(*lp).lp_off], + sizeof(htdata)); + + tup = &htdata; + + printf("\n\t:ctid=%s:oid=%d", + ItemPointerFormExternal(&tup->t_ctid), + tup->t_oid); + printf(":natts=%d:thoff=%d:", + tup->t_natts, + tup->t_hoff); + + printf("\n\t:cmin=%u:", + tup->t_cmin); + + printf("xmin=%u:", tup->t_xmin); + + printf("\n\t:cmax=%u:", + tup->t_cmax); + + printf("xmax=%u:\n", tup->t_xmax); + + } else + putchar('\n'); + } +} + +static char* +ItemPointerFormExternal(ItemPointer pointer) +{ + static char itemPointerString[32]; + + if (!ItemPointerIsValid(pointer)) { + memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->"); + } else { + sprintf(itemPointerString, "<%u,%u>", + ItemPointerGetBlockNumber(pointer), + ItemPointerGetOffsetNumber(pointer)); + } + + return itemPointerString; +} + +#endif + +static int +_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln) +{ + IndexScanDesc iscan; + RetrieveIndexResult res; + HeapTupleData tuple; + Datum d; + long size; + bool isNull; + Buffer buffer; + + /* scan backwards from end */ + iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL); + + do + { + res = index_getnext(iscan, BackwardScanDirection); + + /* + * If there are no more index tuples, then the relation is empty, + * so the file's size is zero. + */ + + if (res == (RetrieveIndexResult) NULL) + { + index_endscan(iscan); + return 0; + } + + /* + * For time travel, we need to use the actual time qual here, + * rather that NowTimeQual. We currently have no way to pass a + * time qual in. + */ + tuple.t_self = res->heap_iptr; + heap_fetch(hreln, SnapshotNow, &tuple, &buffer); + pfree(res); + } while (tuple.t_data == NULL); + + /* don't need the index scan anymore */ + index_endscan(iscan); + + /* get olastbyte attribute */ + d = heap_getattr(&tuple, 1, hdesc, &isNull); + size = DatumGetInt32(d) + 1; ReleaseBuffer(buffer); - - return n; -} - -static int inv_write_append(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData)); - Relation idescs[Num_pg_largeobject_indices]; - Datum values[Natts_pg_largeobject]; - char nulls[Natts_pg_largeobject]; - int i; - uint32 len; - - for (i=0; iid); - len = (nbytes > IBLKSIZE) ? IBLKSIZE : nbytes; - - values[i++] = Int32GetDatum(obj_desc->offset / IBLKSIZE); - values[i++] = (Datum) _byteain(buf, len); - - ntup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls); - heap_insert(obj_desc->heap_r, ntup); - - if (!IsIgnoringSystemIndexes()) { - CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs); - CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, ntup); - CatalogCloseIndices(Num_pg_largeobject_indices, idescs); - } - - heap_freetuple(ntup); - - return len; -} - -static int inv_write_int(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - int nwritten = 0; - int found = 0; - - if (nbytes == 0) - return 0; - - nwritten = inv_write_existing(obj_desc, buf, nbytes, &found); - if (found > 0) { - obj_desc->offset += nwritten; - return nwritten; - } - /* Looks like we are beyond the end of the file */ - nwritten = inv_write_append(obj_desc, buf, nbytes); - obj_desc->offset += nwritten; - return nwritten; -} - -static int count = 0; - -int -inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - int nwritten = 0; - while (nwritten < nbytes) - nwritten += inv_write_int(obj_desc, buf + nwritten, nbytes - nwritten); - - return nwritten; + + return size; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index debf1f2626e..3e4ba6c72a3 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -22,7 +22,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.171 2000/10/21 15:55:26 momjian Exp $ + * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.172 2000/10/22 05:27:18 momjian Exp $ * * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb * @@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal) fprintf(stderr, "%s saving BLOBs\n", g_comment_start); /* Cursor to get all BLOB tables */ - appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT loid from pg_largeobject"); + appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT); res = PQexec(g_conn, oidQry->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) diff --git a/src/include/catalog/catname.h b/src/include/catalog/catname.h index cb95771147d..b82977d806c 100644 --- a/src/include/catalog/catname.h +++ b/src/include/catalog/catname.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catname.h,v 1.15 2000/10/21 15:55:28 momjian Exp $ + * $Id: catname.h,v 1.16 2000/10/22 05:27:20 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -29,7 +29,6 @@ #define InheritsRelationName "pg_inherits" #define InheritancePrecidenceListRelationName "pg_ipl" #define LanguageRelationName "pg_language" -#define LargeobjectRelationName "pg_largeobject" #define ListenerRelationName "pg_listener" #define LogRelationName "pg_log" #define OperatorClassRelationName "pg_opclass" diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index 7bee3e0f039..6cc98bdc322 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: indexing.h,v 1.43 2000/10/21 15:55:28 momjian Exp $ + * $Id: indexing.h,v 1.44 2000/10/22 05:27:20 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -31,7 +31,6 @@ #define Num_pg_index_indices 2 #define Num_pg_inherits_indices 1 #define Num_pg_language_indices 2 -#define Num_pg_largeobject_indices 2 #define Num_pg_listener_indices 1 #define Num_pg_opclass_indices 2 #define Num_pg_operator_indices 2 @@ -63,8 +62,6 @@ #define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index" #define LanguageNameIndex "pg_language_name_index" #define LanguageOidIndex "pg_language_oid_index" -#define LargeobjectLOIdIndex "pg_largeobject_loid_index" -#define LargeobjectLOIdPNIndex "pg_largeobject_loid_pn_index" #define ListenerPidRelnameIndex "pg_listener_pid_relname_index" #define OpclassDeftypeIndex "pg_opclass_deftype_index" #define OpclassNameIndex "pg_opclass_name_index" @@ -95,7 +92,6 @@ extern char *Name_pg_group_indices[]; extern char *Name_pg_index_indices[]; extern char *Name_pg_inherits_indices[]; extern char *Name_pg_language_indices[]; -extern char *Name_pg_largeobject_indices[]; extern char *Name_pg_listener_indices[]; extern char *Name_pg_opclass_indices[]; extern char *Name_pg_operator_indices[]; @@ -195,8 +191,6 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops)); DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops)); DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops)); -DECLARE_INDEX(pg_largeobject_loid_index on pg_largeobject using btree(loid oid_ops)); -DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops)); DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops)); /* This column needs to allow multiple zero entries, but is in the cache */ DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops)); diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h deleted file mode 100644 index 409aaf8d226..00000000000 --- a/src/include/catalog/pg_largeobject.h +++ /dev/null @@ -1,63 +0,0 @@ -/*------------------------------------------------------------------------- - * - * pg_largeobject.h - * definition of the system "largeobject" relation (pg_largeobject) - * along with the relation's initial contents. - * - * - * Portions Copyright (c) 1996-2000, PostgreSQL, Inc - * Portions Copyright (c) 1994, Regents of the University of California - * - * $Id: pg_largeobject.h,v 1.3 2000/10/21 15:55:28 momjian Exp $ - * - * NOTES - * the genbki.sh script reads this file and generates .bki - * information from the DATA() statements. - * - *------------------------------------------------------------------------- - */ -#ifndef PG_LARGEOBJECT_H -#define PG_LARGEOBJECT_H - -/* ---------------- - * postgres.h contains the system type definintions and the - * CATALOG(), BOOTSTRAP and DATA() sugar words so this file - * can be read by both genbki.sh and the C compiler. - * ---------------- - */ - -/* ---------------- - * pg_largeobject definition. cpp turns this into - * typedef struct FormData_pg_largeobject. Large object id - * is stored in loid; - * ---------------- - */ - -CATALOG(pg_largeobject) -{ - Oid loid; - int4 pageno; - bytea data; -} FormData_pg_largeobject; - -/* ---------------- - * Form_pg_largeobject corresponds to a pointer to a tuple with - * the format of pg_largeobject relation. - * ---------------- - */ -typedef FormData_pg_largeobject *Form_pg_largeobject; - -/* ---------------- - * compiler constants for pg_largeobject - * ---------------- - */ -#define Natts_pg_largeobject 3 -#define Anum_pg_largeobject_loid 1 -#define Anum_pg_largeobject_pageno 2 -#define Anum_pg_largeobject_data 3 - -Oid LargeobjectCreate(Oid loid); -void LargeobjectDrop(Oid loid); -int LargeobjectFind(Oid loid); - -#endif /* PG_LARGEOBJECT_H */ diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h index 77990c56335..c480f5b7874 100644 --- a/src/include/storage/large_object.h +++ b/src/include/storage/large_object.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: large_object.h,v 1.16 2000/10/21 15:55:29 momjian Exp $ + * $Id: large_object.h,v 1.17 2000/10/22 05:27:23 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -22,11 +22,17 @@ /* * This structure will eventually have lots more stuff associated with it. */ -typedef struct LargeObjectDesc { - Relation heap_r; - Relation index_r; +typedef struct LargeObjectDesc +{ + Relation heap_r; /* heap relation */ + Relation index_r; /* index relation on seqno attribute */ + IndexScanDesc iscan; /* index scan we're using */ + TupleDesc hdesc; /* heap relation tuple desc */ + TupleDesc idesc; /* index relation tuple desc */ + uint32 lowbyte; /* low byte on the current page */ + uint32 highbyte; /* high byte on the current page */ uint32 offset; /* current seek pointer */ - Oid id; + ItemPointerData htid; /* tid of current heap tuple */ #define IFS_RDLOCK (1 << 0) #define IFS_WRLOCK (1 << 1) @@ -49,4 +55,7 @@ extern int inv_tell(LargeObjectDesc *obj_desc); extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes); extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes); +/* added for buffer leak prevention [ PA ] */ +extern void inv_cleanindex(LargeObjectDesc *obj_desc); + #endif /* LARGE_OBJECT_H */