1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-21 16:02:15 +03:00

Change the relation_open protocol so that we obtain lock on a relation

(table or index) before trying to open its relcache entry.  This fixes
race conditions in which someone else commits a change to the relation's
catalog entries while we are in process of doing relcache load.  Problems
of that ilk have been reported sporadically for years, but it was not
really practical to fix until recently --- for instance, the recent
addition of WAL-log support for in-place updates helped.

Along the way, remove pg_am.amconcurrent: all AMs are now expected to support
concurrent update.
This commit is contained in:
Tom Lane
2006-07-31 20:09:10 +00:00
parent 4cd72b53b9
commit 09d3670df3
40 changed files with 635 additions and 648 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.217 2006/07/14 14:52:17 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.218 2006/07/31 20:08:59 tgl Exp $
*
*
* INTERFACE ROUTINES
@ -51,6 +51,7 @@
#include "pgstat.h"
#include "storage/procarray.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
@ -687,15 +688,16 @@ relation_open(Oid relationId, LOCKMODE lockmode)
Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
/* Get the lock before trying to open the relcache entry */
if (lockmode != NoLock)
LockRelationOid(relationId, lockmode);
/* The relcache does all the real work... */
r = RelationIdGetRelation(relationId);
if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId);
if (lockmode != NoLock)
LockRelation(r, lockmode);
return r;
}
@ -713,26 +715,38 @@ conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
/* Get the lock before trying to open the relcache entry */
if (lockmode != NoLock)
{
if (nowait)
{
if (!ConditionalLockRelationOid(relationId, lockmode))
{
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(relationId);
if (relname)
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
else
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation with OID %u",
relationId)));
}
}
else
LockRelationOid(relationId, lockmode);
}
/* The relcache does all the real work... */
r = RelationIdGetRelation(relationId);
if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId);
if (lockmode != NoLock)
{
if (nowait)
{
if (!ConditionalLockRelation(r, lockmode))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
RelationGetRelationName(r))));
}
else
LockRelation(r, lockmode);
}
return r;
}
@ -749,12 +763,12 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
/*
* Check for shared-cache-inval messages before trying to open the
* relation. This is needed to cover the case where the name identifies a
* rel that has been dropped and recreated since the start of our
* relation. This is needed to cover the case where the name identifies
* a rel that has been dropped and recreated since the start of our
* transaction: if we don't flush the old syscache entry then we'll latch
* onto that entry and suffer an error when we do LockRelation. Note that
* relation_open does not need to do this, since a relation's OID never
* changes.
* onto that entry and suffer an error when we do RelationIdGetRelation.
* Note that relation_open does not need to do this, since a relation's
* OID never changes.
*
* We skip this if asked for NoLock, on the assumption that the caller has
* already ensured some appropriate lock is held.
@ -772,7 +786,7 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
/* ----------------
* relation_close - close any relation
*
* If lockmode is not "NoLock", we first release the specified lock.
* If lockmode is not "NoLock", we then release the specified lock.
*
* Note that it is often sensible to hold a lock beyond relation_close;
* in that case, the lock is released automatically at xact end.
@ -781,13 +795,15 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
void
relation_close(Relation relation, LOCKMODE lockmode)
{
Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
LockRelId relid = relation->rd_lockInfo.lockRelId;
if (lockmode != NoLock)
UnlockRelation(relation, lockmode);
Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
/* The relcache does the real work... */
RelationClose(relation);
if (lockmode != NoLock)
UnlockRelationId(&relid, lockmode);
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.62 2006/07/14 14:52:17 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.63 2006/07/31 20:08:59 tgl Exp $
*
*
* INTERFACE ROUTINES
@ -1004,7 +1004,7 @@ toast_save_datum(Relation rel, Datum value)
*/
toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
toasttupDesc = toastrel->rd_att;
toastidx = index_open(toastrel->rd_rel->reltoastidxid);
toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
/*
* Create the varattrib reference
@ -1043,12 +1043,6 @@ toast_save_datum(Relation rel, Datum value)
data_p = VARATT_DATA(value);
data_todo = VARATT_SIZE(value) - VARHDRSZ;
/*
* We must explicitly lock the toast index because we aren't using an
* index scan here.
*/
LockRelation(toastidx, RowExclusiveLock);
/*
* Split up the item into chunks
*/
@ -1098,8 +1092,7 @@ toast_save_datum(Relation rel, Datum value)
/*
* Done - close toast relation and return the reference
*/
UnlockRelation(toastidx, RowExclusiveLock);
index_close(toastidx);
index_close(toastidx, RowExclusiveLock);
heap_close(toastrel, RowExclusiveLock);
return PointerGetDatum(result);
@ -1130,7 +1123,7 @@ toast_delete_datum(Relation rel, Datum value)
*/
toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
RowExclusiveLock);
toastidx = index_open(toastrel->rd_rel->reltoastidxid);
toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
/*
* Setup a scan key to fetch from the index by va_valueid (we don't
@ -1144,7 +1137,7 @@ toast_delete_datum(Relation rel, Datum value)
/*
* Find the chunks by index
*/
toastscan = index_beginscan(toastrel, toastidx, true,
toastscan = index_beginscan(toastrel, toastidx,
SnapshotToast, 1, &toastkey);
while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
{
@ -1158,7 +1151,7 @@ toast_delete_datum(Relation rel, Datum value)
* End scan and close relations
*/
index_endscan(toastscan);
index_close(toastidx);
index_close(toastidx, RowExclusiveLock);
heap_close(toastrel, RowExclusiveLock);
}
@ -1202,7 +1195,7 @@ toast_fetch_datum(varattrib *attr)
toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
AccessShareLock);
toasttupDesc = toastrel->rd_att;
toastidx = index_open(toastrel->rd_rel->reltoastidxid);
toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
/*
* Setup a scan key to fetch from the index by va_valueid
@ -1221,7 +1214,7 @@ toast_fetch_datum(varattrib *attr)
*/
nextidx = 0;
toastscan = index_beginscan(toastrel, toastidx, true,
toastscan = index_beginscan(toastrel, toastidx,
SnapshotToast, 1, &toastkey);
while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
{
@ -1282,7 +1275,7 @@ toast_fetch_datum(varattrib *attr)
* End scan and close relations
*/
index_endscan(toastscan);
index_close(toastidx);
index_close(toastidx, AccessShareLock);
heap_close(toastrel, AccessShareLock);
return result;
@ -1355,7 +1348,7 @@ toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
AccessShareLock);
toasttupDesc = toastrel->rd_att;
toastidx = index_open(toastrel->rd_rel->reltoastidxid);
toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
/*
* Setup a scan key to fetch from the index. This is either two keys or
@ -1396,7 +1389,7 @@ toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
* The index is on (valueid, chunkidx) so they will come in order
*/
nextidx = startchunk;
toastscan = index_beginscan(toastrel, toastidx, true,
toastscan = index_beginscan(toastrel, toastidx,
SnapshotToast, nscankeys, toastkey);
while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
{
@ -1461,7 +1454,7 @@ toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
* End scan and close relations
*/
index_endscan(toastscan);
index_close(toastidx);
index_close(toastidx, AccessShareLock);
heap_close(toastrel, AccessShareLock);
return result;