1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add lo_truncate() to backend and libpq for large object truncation.

Kris Jurka
This commit is contained in:
Bruce Momjian
2007-03-03 19:52:47 +00:00
parent 90d76525c5
commit 0763a56501
12 changed files with 372 additions and 26 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.85 2007/02/27 23:48:07 tgl Exp $
* $PostgreSQL: pgsql/src/backend/libpq/be-fsstubs.c,v 1.86 2007/03/03 19:52:46 momjian Exp $
*
* NOTES
* This should be moved to a more appropriate place. It is here
@ -120,12 +120,10 @@ lo_close(PG_FUNCTION_ARGS)
int32 fd = PG_GETARG_INT32(0);
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
PG_RETURN_INT32(-1);
}
#if FSDB
elog(DEBUG4, "lo_close(%d)", fd);
#endif
@ -152,12 +150,9 @@ lo_read(int fd, char *buf, int len)
int status;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
return -1;
}
status = inv_read(cookies[fd], buf, len);
@ -170,12 +165,9 @@ lo_write(int fd, const char *buf, int len)
int status;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
return -1;
}
if ((cookies[fd]->flags & IFS_WRLOCK) == 0)
ereport(ERROR,
@ -198,12 +190,9 @@ lo_lseek(PG_FUNCTION_ARGS)
int status;
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
PG_RETURN_INT32(-1);
}
status = inv_seek(cookies[fd], offset, whence);
@ -248,12 +237,9 @@ lo_tell(PG_FUNCTION_ARGS)
int32 fd = PG_GETARG_INT32(0);
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
PG_RETURN_INT32(-1);
}
PG_RETURN_INT32(inv_tell(cookies[fd]));
}
@ -467,6 +453,26 @@ lo_export(PG_FUNCTION_ARGS)
PG_RETURN_INT32(1);
}
/*
* lo_truncate -
* truncate a large object to a specified length
*/
Datum
lo_truncate(PG_FUNCTION_ARGS)
{
int32 fd = PG_GETARG_INT32(0);
int32 len = PG_GETARG_INT32(1);
if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("invalid large-object descriptor: %d", fd)));
inv_truncate(cookies[fd], len);
PG_RETURN_INT32(0);
}
/*
* AtEOXact_LargeObject -
* prepares large objects for transaction commit

View File

@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.122 2007/02/27 23:48:07 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.123 2007/03/03 19:52:46 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@ -681,3 +681,166 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
return nwritten;
}
void
inv_truncate(LargeObjectDesc *obj_desc, int len)
{
int32 pageno = (int32) (len / LOBLKSIZE);
int off;
ScanKeyData skey[2];
IndexScanDesc sd;
HeapTuple oldtuple;
Form_pg_largeobject olddata;
struct
{
bytea hdr;
char data[LOBLKSIZE];
} workbuf;
char *workb = VARDATA(&workbuf.hdr);
HeapTuple newtup;
Datum values[Natts_pg_largeobject];
char nulls[Natts_pg_largeobject];
char replace[Natts_pg_largeobject];
CatalogIndexState indstate;
Assert(PointerIsValid(obj_desc));
/* enforce writability because snapshot is probably wrong otherwise */
if ((obj_desc->flags & IFS_WRLOCK) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("large object %u was not opened for writing",
obj_desc->id)));
open_lo_relation();
indstate = CatalogOpenIndexes(lo_heap_r);
ScanKeyInit(&skey[0],
Anum_pg_largeobject_loid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(obj_desc->id));
ScanKeyInit(&skey[1],
Anum_pg_largeobject_pageno,
BTGreaterEqualStrategyNumber, F_INT4GE,
Int32GetDatum(pageno));
sd = index_beginscan(lo_heap_r, lo_index_r,
obj_desc->snapshot, 2, skey);
/*
* If possible, get the page the truncation point is in.
* The truncation point may be beyond the end of the LO or
* in a hole.
*/
olddata = NULL;
if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
{
olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
Assert(olddata->pageno >= pageno);
}
/*
* If we found the page of the truncation point we need to
* truncate the data in it. Otherwise if we're in a hole,
* we need to create a page to mark the end of data.
*/
if (olddata != NULL && olddata->pageno == pageno)
{
/* First, load old data into workbuf */
bytea *datafield = &(olddata->data);
bool pfreeit = false;
int pagelen;
if (VARATT_IS_EXTENDED(datafield))
{
datafield = (bytea *)
heap_tuple_untoast_attr((varattrib *) datafield);
pfreeit = true;
}
pagelen = getbytealen(datafield);
Assert(pagelen <= LOBLKSIZE);
memcpy(workb, VARDATA(datafield), pagelen);
if (pfreeit)
pfree(datafield);
/*
* Fill any hole
*/
off = len % LOBLKSIZE;
if (off > pagelen)
MemSet(workb + pagelen, 0, off - pagelen);
/* compute length of new page */
SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
/*
* Form and insert updated tuple
*/
memset(values, 0, sizeof(values));
memset(nulls, ' ', sizeof(nulls));
memset(replace, ' ', sizeof(replace));
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
replace[Anum_pg_largeobject_data - 1] = 'r';
newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
values, nulls, replace);
simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
CatalogIndexInsert(indstate, newtup);
heap_freetuple(newtup);
}
else
{
/*
* If the first page we found was after the truncation
* point, we're in a hole that we'll fill, but we need to
* delete the later page.
*/
if (olddata != NULL && olddata->pageno > pageno)
simple_heap_delete(lo_heap_r, &oldtuple->t_self);
/*
* Write a brand new page.
*
* Fill the hole up to the truncation point
*/
off = len % LOBLKSIZE;
if (off > 0)
MemSet(workb, 0, off);
/* compute length of new page */
SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
/*
* Form and insert new tuple
*/
memset(values, 0, sizeof(values));
memset(nulls, ' ', sizeof(nulls));
values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
simple_heap_insert(lo_heap_r, newtup);
CatalogIndexInsert(indstate, newtup);
heap_freetuple(newtup);
}
/*
* Delete any pages after the truncation point
*/
while ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
{
simple_heap_delete(lo_heap_r, &oldtuple->t_self);
}
index_endscan(sd);
CatalogCloseIndexes(indstate);
/*
* Advance command counter so that tuple updates will be seen by later
* large-object operations in this transaction.
*/
CommandCounterIncrement();
}