1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Make DROP TABLE rollback-able: postpone physical file delete until commit.

(WAL logging for this is not done yet, however.)  Clean up a number of really
crufty things that are no longer needed now that DROP behaves nicely.  Make
temp table mapper do the right things when drop or rename affecting a temp
table is rolled back.  Also, remove "relation modified while in use" error
check, in favor of locking tables at first reference and holding that lock
throughout the statement.
This commit is contained in:
Tom Lane
2000-11-08 22:10:03 +00:00
parent ebe0b23690
commit 3908473c80
46 changed files with 1305 additions and 1187 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.93 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -709,23 +709,28 @@ refcount = %ld, file: %s, line: %d\n",
#endif
/*
* FlushBuffer -- like WriteBuffer, but force the page to disk.
* FlushBuffer -- like WriteBuffer, but write the page immediately,
* rather than just marking it dirty. On success return, the buffer will
* no longer be dirty.
*
* 'buffer' is known to be dirty/pinned, so there should not be a
* problem reading the BufferDesc members without the BufMgrLock
* (nobody should be able to change tags out from under us).
*
* Unpin if 'release' is TRUE.
* If 'sync' is true, a synchronous write is wanted (wait for buffer to hit
* the disk). Otherwise it's sufficient to issue the kernel write call.
*
* Unpin buffer if 'release' is true.
*/
int
FlushBuffer(Buffer buffer, bool release)
FlushBuffer(Buffer buffer, bool sync, bool release)
{
BufferDesc *bufHdr;
Relation bufrel;
int status;
if (BufferIsLocal(buffer))
return FlushLocalBuffer(buffer, release) ? STATUS_OK : STATUS_ERROR;
return FlushLocalBuffer(buffer, sync, release) ? STATUS_OK : STATUS_ERROR;
if (BAD_BUFFER_ID(buffer))
return STATUS_ERROR;
@@ -755,12 +760,16 @@ FlushBuffer(Buffer buffer, bool release)
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
if (sync)
status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
else
status = smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
/* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
RelationDecrementReferenceCount(bufrel);
if (status == SM_FAIL)
@@ -926,7 +935,7 @@ SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
/*
* drop relcache refcnt incremented by
* RelationIdCacheGetRelation
* RelationNodeCacheGetRelation
*/
RelationDecrementReferenceCount(reln);
}
@@ -1123,7 +1132,7 @@ BufferSync()
bufHdr->flags &= ~BM_DIRTY;
}
/* drop refcnt obtained by RelationIdCacheGetRelation */
/* drop refcnt obtained by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
}
@@ -1154,7 +1163,7 @@ BufferSync()
/*
* drop relcache refcnt incremented by
* RelationIdCacheGetRelation
* RelationNodeCacheGetRelation
*/
RelationDecrementReferenceCount(reln);
@@ -1458,7 +1467,7 @@ BufferReplace(BufferDesc *bufHdr)
SpinAcquire(BufMgrLock);
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
/* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
@@ -1495,21 +1504,23 @@ RelationGetNumberOfBlocks(Relation relation)
}
/* ---------------------------------------------------------------------
* ReleaseRelationBuffers
* DropRelationBuffers
*
* This function removes all the buffered pages for a relation
* from the buffer pool. Dirty pages are simply dropped, without
* bothering to write them out first. This is used when the
* relation is about to be deleted. We assume that the caller
* holds an exclusive lock on the relation, which should assure
* that no new buffers will be acquired for the rel meanwhile.
* bothering to write them out first. This is NOT rollback-able,
* and so should be used only with extreme caution!
*
* We assume that the caller holds an exclusive lock on the relation,
* which should assure that no new buffers will be acquired for the rel
* meanwhile.
*
* XXX currently it sequentially searches the buffer pool, should be
* changed to more clever ways of searching.
* --------------------------------------------------------------------
*/
void
ReleaseRelationBuffers(Relation rel)
DropRelationBuffers(Relation rel)
{
int i;
BufferDesc *bufHdr;
@@ -1589,7 +1600,104 @@ recheck:
* this rel, since we hold exclusive lock on this rel.
*/
if (RelFileNodeEquals(rel->rd_node,
BufferTagLastDirtied[i - 1].rnode))
BufferTagLastDirtied[i - 1].rnode))
BufferDirtiedByMe[i - 1] = false;
}
SpinRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
* DropRelFileNodeBuffers
*
* This is the same as DropRelationBuffers, except that the target
* relation is specified by RelFileNode.
*
* This is NOT rollback-able. One legitimate use is to clear the
* buffer cache of buffers for a relation that is being deleted
* during transaction abort.
* --------------------------------------------------------------------
*/
void
DropRelFileNodeBuffers(RelFileNode rnode)
{
int i;
BufferDesc *bufHdr;
/* We have to search both local and shared buffers... */
for (i = 0; i < NLocBuffer; i++)
{
bufHdr = &LocalBufferDescriptors[i];
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
LocalRefCount[i] = 0;
bufHdr->tag.rnode.relNode = InvalidOid;
}
}
SpinAcquire(BufMgrLock);
for (i = 1; i <= NBuffers; i++)
{
bufHdr = &BufferDescriptors[i - 1];
recheck:
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{
/*
* If there is I/O in progress, better wait till it's done;
* don't want to delete the relation out from under someone
* who's just trying to flush the buffer!
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
WaitIO(bufHdr, BufMgrLock);
/*
* By now, the buffer very possibly belongs to some other
* rel, so check again before proceeding.
*/
goto recheck;
}
/* Now we can do what we came for */
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
/*
* Release any refcount we may have.
*
* This is very probably dead code, and if it isn't then it's
* probably wrong. I added the Assert to find out --- tgl
* 11/99.
*/
if (!(bufHdr->flags & BM_FREE))
{
/* Assert checks that buffer will actually get freed! */
Assert(PrivateRefCount[i - 1] == 1 &&
bufHdr->refcount == 1);
/* ReleaseBuffer expects we do not hold the lock at entry */
SpinRelease(BufMgrLock);
ReleaseBuffer(i);
SpinAcquire(BufMgrLock);
}
/*
* And mark the buffer as no longer occupied by this rel.
*/
BufTableDelete(bufHdr);
}
/*
* Also check to see if BufferDirtiedByMe info for this buffer
* refers to the target relation, and clear it if so. This is
* independent of whether the current contents of the buffer
* belong to the target relation!
*
* NOTE: we have no way to clear BufferDirtiedByMe info in other
* backends, but hopefully there are none with that bit set for
* this rel, since we hold exclusive lock on this rel.
*/
if (RelFileNodeEquals(rnode,
BufferTagLastDirtied[i - 1].rnode))
BufferDirtiedByMe[i - 1] = false;
}
@@ -1604,7 +1712,7 @@ recheck:
* bothering to write them out first. This is used when we destroy a
* database, to avoid trying to flush data to disk when the directory
* tree no longer exists. Implementation is pretty similar to
* ReleaseRelationBuffers() which is for destroying just one relation.
* DropRelationBuffers() which is for destroying just one relation.
* --------------------------------------------------------------------
*/
void
@@ -1757,33 +1865,32 @@ BufferPoolBlowaway()
/* ---------------------------------------------------------------------
* FlushRelationBuffers
*
* This function flushes all dirty pages of a relation out to disk.
* This function writes all dirty pages of a relation out to disk.
* Furthermore, pages that have blocknumber >= firstDelBlock are
* actually removed from the buffer pool. An error code is returned
* if we fail to dump a dirty buffer or if we find one of
* the target pages is pinned into the cache.
*
* This is used by VACUUM before truncating the relation to the given
* number of blocks. (TRUNCATE TABLE also uses it in the same way.)
* It might seem unnecessary to flush dirty pages before firstDelBlock,
* since VACUUM should already have committed its changes. However,
* it is possible for there still to be dirty pages: if some page
* had unwritten on-row tuple status updates from a prior transaction,
* and VACUUM had no additional changes to make to that page, then
* VACUUM won't have written it. This is harmless in most cases but
* will break pg_upgrade, which relies on VACUUM to ensure that *all*
* tuples have correct on-row status. So, we check and flush all
* dirty pages of the rel regardless of block number.
* This is called by DROP TABLE to clear buffers for the relation
* from the buffer pool. Note that we must write dirty buffers,
* rather than just dropping the changes, because our transaction
* might abort later on; we want to roll back safely in that case.
*
* This is also used by RENAME TABLE (with firstDelBlock = 0)
* to clear out the buffer cache before renaming the physical files of
* a relation. Without that, some other backend might try to do a
* blind write of a buffer page (relying on the BlindId of the buffer)
* and fail because it's not got the right filename anymore.
* This is also called by VACUUM before truncating the relation to the
* given number of blocks. It might seem unnecessary for VACUUM to
* write dirty pages before firstDelBlock, since VACUUM should already
* have committed its changes. However, it is possible for there still
* to be dirty pages: if some page had unwritten on-row tuple status
* updates from a prior transaction, and VACUUM had no additional
* changes to make to that page, then VACUUM won't have written it.
* This is harmless in most cases but will break pg_upgrade, which
* relies on VACUUM to ensure that *all* tuples have correct on-row
* status. So, we check and flush all dirty pages of the rel
* regardless of block number.
*
* In all cases, the caller should be holding AccessExclusiveLock on
* the target relation to ensure that no other backend is busy reading
* more blocks of the relation.
* more blocks of the relation (or might do so before we commit).
*
* Formerly, we considered it an error condition if we found dirty
* buffers here. However, since BufferSync no longer forces out all
@@ -1812,7 +1919,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
{
if (bufHdr->flags & BM_DIRTY)
{
if (FlushBuffer(-i - 1, false) != STATUS_OK)
if (FlushBuffer(-i - 1, false, false) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
RelationGetRelationName(rel), firstDelBlock,
@@ -1840,15 +1947,17 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
for (i = 0; i < NBuffers; i++)
{
bufHdr = &BufferDescriptors[i];
recheck:
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
{
if (bufHdr->flags & BM_DIRTY)
{
PinBuffer(bufHdr);
SpinRelease(BufMgrLock);
if (FlushBuffer(i + 1, true) != STATUS_OK)
if (FlushBuffer(i + 1, false, false) != STATUS_OK)
{
SpinAcquire(BufMgrLock);
UnpinBuffer(bufHdr);
SpinRelease(BufMgrLock);
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
RelationGetRelationName(rel), firstDelBlock,
bufHdr->tag.blockNum,
@@ -1856,12 +1965,7 @@ recheck:
return -1;
}
SpinAcquire(BufMgrLock);
/*
* Buffer could already be reassigned, so must recheck
* whether it still belongs to rel before freeing it!
*/
goto recheck;
UnpinBuffer(bufHdr);
}
if (!(bufHdr->flags & BM_FREE))
{

View File

@@ -16,7 +16,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.34 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -183,7 +183,7 @@ WriteLocalBuffer(Buffer buffer, bool release)
* flushes a local buffer
*/
int
FlushLocalBuffer(Buffer buffer, bool release)
FlushLocalBuffer(Buffer buffer, bool sync, bool release)
{
int bufid;
Relation bufrel;
@@ -199,13 +199,18 @@ FlushLocalBuffer(Buffer buffer, bool release)
bufHdr = &LocalBufferDescriptors[bufid];
bufHdr->flags &= ~BM_DIRTY;
bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
Assert(bufrel != NULL);
smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
if (sync)
smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
else
smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
LocalBufferFlushCount++;
/* drop relcache refcount incremented by RelationIdCacheGetRelation */
/* drop relcache refcount incremented by RelationNodeCacheGetRelation */
RelationDecrementReferenceCount(bufrel);
if (release)

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.2 2000/11/08 22:09:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -838,7 +838,7 @@ BufferSync()
SpinRelease(BufMgrLock);
/* drop refcnt obtained by RelationIdCacheGetRelation */
/* drop refcnt obtained by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
{
RelationDecrementReferenceCount(reln);
@@ -1128,7 +1128,7 @@ BufferReplace(BufferDesc *bufHdr)
false); /* no fsync */
}
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
/* drop relcache refcnt incremented by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
@@ -1159,21 +1159,23 @@ RelationGetNumberOfBlocks(Relation relation)
}
/* ---------------------------------------------------------------------
* ReleaseRelationBuffers
* DropRelationBuffers
*
* This function removes all the buffered pages for a relation
* from the buffer pool. Dirty pages are simply dropped, without
* bothering to write them out first. This is used when the
* relation is about to be deleted. We assume that the caller
* holds an exclusive lock on the relation, which should assure
* that no new buffers will be acquired for the rel meanwhile.
* bothering to write them out first. This is NOT rollback-able,
* and so should be used only with extreme caution!
*
* We assume that the caller holds an exclusive lock on the relation,
* which should assure that no new buffers will be acquired for the rel
* meanwhile.
*
* XXX currently it sequentially searches the buffer pool, should be
* changed to more clever ways of searching.
* --------------------------------------------------------------------
*/
void
ReleaseRelationBuffers(Relation rel)
DropRelationBuffers(Relation rel)
{
int i;
BufferDesc *bufHdr;
@@ -1248,6 +1250,91 @@ recheck:
SpinRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
* DropRelFileNodeBuffers
*
* This is the same as DropRelationBuffers, except that the target
* relation is specified by RelFileNode.
*
* This is NOT rollback-able. One legitimate use is to clear the
* buffer cache of buffers for a relation that is being deleted
* during transaction abort.
* --------------------------------------------------------------------
*/
void
DropRelFileNodeBuffers(RelFileNode rnode)
{
int i;
BufferDesc *bufHdr;
/* We have to search both local and shared buffers... */
for (i = 0; i < NLocBuffer; i++)
{
bufHdr = &LocalBufferDescriptors[i];
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->cntxDirty = false;
LocalRefCount[i] = 0;
bufHdr->tag.rnode.relNode = InvalidOid;
}
}
SpinAcquire(BufMgrLock);
for (i = 1; i <= NBuffers; i++)
{
bufHdr = &BufferDescriptors[i - 1];
recheck:
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{
/*
* If there is I/O in progress, better wait till it's done;
* don't want to delete the relation out from under someone
* who's just trying to flush the buffer!
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
WaitIO(bufHdr, BufMgrLock);
/*
* By now, the buffer very possibly belongs to some other
* rel, so check again before proceeding.
*/
goto recheck;
}
/* Now we can do what we came for */
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->cntxDirty = false;
/*
* Release any refcount we may have.
*
* This is very probably dead code, and if it isn't then it's
* probably wrong. I added the Assert to find out --- tgl
* 11/99.
*/
if (!(bufHdr->flags & BM_FREE))
{
/* Assert checks that buffer will actually get freed! */
Assert(PrivateRefCount[i - 1] == 1 &&
bufHdr->refcount == 1);
/* ReleaseBuffer expects we do not hold the lock at entry */
SpinRelease(BufMgrLock);
ReleaseBuffer(i);
SpinAcquire(BufMgrLock);
}
/*
* And mark the buffer as no longer occupied by this rel.
*/
BufTableDelete(bufHdr);
}
}
SpinRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
* DropBuffers
*
@@ -1256,7 +1343,7 @@ recheck:
* bothering to write them out first. This is used when we destroy a
* database, to avoid trying to flush data to disk when the directory
* tree no longer exists. Implementation is pretty similar to
* ReleaseRelationBuffers() which is for destroying just one relation.
* DropRelationBuffers() which is for destroying just one relation.
* --------------------------------------------------------------------
*/
void
@@ -1399,33 +1486,32 @@ BufferPoolBlowaway()
/* ---------------------------------------------------------------------
* FlushRelationBuffers
*
* This function flushes all dirty pages of a relation out to disk.
* This function writes all dirty pages of a relation out to disk.
* Furthermore, pages that have blocknumber >= firstDelBlock are
* actually removed from the buffer pool. An error code is returned
* if we fail to dump a dirty buffer or if we find one of
* the target pages is pinned into the cache.
*
* This is used by VACUUM before truncating the relation to the given
* number of blocks. (TRUNCATE TABLE also uses it in the same way.)
* It might seem unnecessary to flush dirty pages before firstDelBlock,
* since VACUUM should already have committed its changes. However,
* it is possible for there still to be dirty pages: if some page
* had unwritten on-row tuple status updates from a prior transaction,
* and VACUUM had no additional changes to make to that page, then
* VACUUM won't have written it. This is harmless in most cases but
* will break pg_upgrade, which relies on VACUUM to ensure that *all*
* tuples have correct on-row status. So, we check and flush all
* dirty pages of the rel regardless of block number.
* This is called by DROP TABLE to clear buffers for the relation
* from the buffer pool. Note that we must write dirty buffers,
* rather than just dropping the changes, because our transaction
* might abort later on; we want to roll back safely in that case.
*
* This is also used by RENAME TABLE (with firstDelBlock = 0)
* to clear out the buffer cache before renaming the physical files of
* a relation. Without that, some other backend might try to do a
* blind write of a buffer page (relying on the BlindId of the buffer)
* and fail because it's not got the right filename anymore.
* This is also called by VACUUM before truncating the relation to the
* given number of blocks. It might seem unnecessary for VACUUM to
* write dirty pages before firstDelBlock, since VACUUM should already
* have committed its changes. However, it is possible for there still
* to be dirty pages: if some page had unwritten on-row tuple status
* updates from a prior transaction, and VACUUM had no additional
* changes to make to that page, then VACUUM won't have written it.
* This is harmless in most cases but will break pg_upgrade, which
* relies on VACUUM to ensure that *all* tuples have correct on-row
* status. So, we check and flush all dirty pages of the rel
* regardless of block number.
*
* In all cases, the caller should be holding AccessExclusiveLock on
* the target relation to ensure that no other backend is busy reading
* more blocks of the relation.
* more blocks of the relation (or might do so before we commit).
*
* Formerly, we considered it an error condition if we found dirty
* buffers here. However, since BufferSync no longer forces out all

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.71 2000/07/17 03:05:08 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.72 2000/11/08 22:10:00 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -453,7 +453,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
bool
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
{
XIDLookupEnt *result,
XIDLookupEnt *xident,
item;
HTAB *xidTable;
bool found;
@@ -559,9 +559,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
/*
* Find or create an xid entry with this tag
*/
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
if (!result)
if (!xident)
{
SpinRelease(masterLock);
elog(NOTICE, "LockAcquire: xid table corrupted");
@@ -573,16 +573,41 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (!found)
{
result->nHolding = 0;
MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&result->queue);
XID_PRINT("LockAcquire: new", result);
xident->nHolding = 0;
MemSet((char *) xident->holders, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&xident->queue);
XID_PRINT("LockAcquire: new", xident);
}
else
{
XID_PRINT("LockAcquire: found", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
Assert(result->nHolding <= lock->nActive);
int i;
XID_PRINT("LockAcquire: found", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] >= 0));
Assert(xident->nHolding <= lock->nActive);
/*
* Issue warning if we already hold a lower-level lock on this
* object and do not hold a lock of the requested level or higher.
* This indicates a deadlock-prone coding practice (eg, we'd have
* a deadlock if another backend were following the same code path
* at about the same time).
*
* XXX Doing numeric comparison on the lockmodes is a hack;
* it'd be better to use a table. For now, though, this works.
*/
for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
{
if (xident->holders[i] > 0)
{
if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */
elog(DEBUG, "Deadlock risk: raising lock level"
" from %s to %s on object %u/%u/%u",
lock_types[i], lock_types[lockmode],
lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
break;
}
}
}
/* ----------------
@@ -601,12 +626,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* hold this lock.
* --------------------
*/
if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
if (xident->nHolding == lock->nActive || xident->holders[lockmode] != 0)
{
result->holders[lockmode]++;
result->nHolding++;
XID_PRINT("LockAcquire: owning", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockAcquire: owning", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
GrantLock(lock, lockmode);
SpinRelease(masterLock);
return TRUE;
@@ -623,27 +648,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* If I don't hold locks or my locks don't conflict with waiters
* then force to sleep.
*/
if (result->nHolding > 0)
if (xident->nHolding > 0)
{
for (; i <= lockMethodTable->ctl->numLockModes; i++)
{
if (result->holders[i] > 0 &&
if (xident->holders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* conflict */
}
}
if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
if (xident->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
{
XID_PRINT("LockAcquire: higher priority proc waiting",
result);
xident);
status = STATUS_FOUND;
}
else
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
}
else
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
if (status == STATUS_OK)
GrantLock(lock, lockmode);
@@ -657,17 +682,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (lockmethod == USER_LOCKMETHOD)
{
if (!result->nHolding)
if (!xident->nHolding)
{
SHMQueueDelete(&result->queue);
result = (XIDLookupEnt *) hash_search(xidTable,
(Pointer) result,
SHMQueueDelete(&xident->queue);
xident = (XIDLookupEnt *) hash_search(xidTable,
(Pointer) xident,
HASH_REMOVE, &found);
if (!result || !found)
if (!xident || !found)
elog(NOTICE, "LockAcquire: remove xid, table corrupted");
}
else
XID_PRINT("LockAcquire: NHOLDING", result);
XID_PRINT("LockAcquire: NHOLDING", xident);
lock->nHolding--;
lock->holders[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
@@ -682,7 +707,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Construct bitmask of locks we hold before going to sleep.
*/
MyProc->holdLock = 0;
if (result->nHolding > 0)
if (xident->nHolding > 0)
{
int i,
tmpMask = 2;
@@ -690,7 +715,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
for (i = 1; i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1)
{
if (result->holders[i] > 0)
if (xident->holders[i] > 0)
MyProc->holdLock |= tmpMask;
}
Assert(MyProc->holdLock != 0);
@@ -702,15 +727,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Check the xid entry status, in case something in the ipc
* communication doesn't work correctly.
*/
if (!((result->nHolding > 0) && (result->holders[lockmode] > 0)))
if (!((xident->nHolding > 0) && (xident->holders[lockmode] > 0)))
{
XID_PRINT("LockAcquire: INCONSISTENT", result);
XID_PRINT("LockAcquire: INCONSISTENT", xident);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
SpinRelease(masterLock);
return FALSE;
}
XID_PRINT("LockAcquire: granted", result);
XID_PRINT("LockAcquire: granted", xident);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
@@ -738,7 +763,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
TransactionId xid,
XIDLookupEnt *xidentP) /* xident ptr or NULL */
{
XIDLookupEnt *result,
XIDLookupEnt *xident,
item;
int *myHolders;
int numLockModes;
@@ -758,7 +783,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* A pointer to the xid entry was supplied from the caller.
* Actually only LockAcquire can do it.
*/
result = xidentP;
xident = xidentP;
}
else
{
@@ -788,9 +813,9 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
/*
* Find or create an xid entry with this tag
*/
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
if (!result)
if (!xident)
{
elog(NOTICE, "LockResolveConflicts: xid table corrupted");
return STATUS_ERROR;
@@ -808,14 +833,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* the lock stats.
* ---------------
*/
MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
result->nHolding = 0;
XID_PRINT("LockResolveConflicts: NOT FOUND", result);
MemSet(xident->holders, 0, numLockModes * sizeof(*(lock->holders)));
xident->nHolding = 0;
XID_PRINT("LockResolveConflicts: NOT FOUND", xident);
}
else
XID_PRINT("LockResolveConflicts: found", result);
XID_PRINT("LockResolveConflicts: found", xident);
}
Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
/* ----------------------------
* first check for global conflicts: If no locks conflict
@@ -829,10 +854,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
{
result->holders[lockmode]++;
result->nHolding++;
XID_PRINT("LockResolveConflicts: no conflict", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockResolveConflicts: no conflict", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
return STATUS_OK;
}
@@ -842,7 +867,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* that does not reflect our own locks.
* ------------------------
*/
myHolders = result->holders;
myHolders = xident->holders;
bitmask = 0;
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -861,14 +886,14 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
{
/* no conflict. Get the lock and go on */
result->holders[lockmode]++;
result->nHolding++;
XID_PRINT("LockResolveConflicts: resolved", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockResolveConflicts: resolved", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
return STATUS_OK;
}
XID_PRINT("LockResolveConflicts: conflicting", result);
XID_PRINT("LockResolveConflicts: conflicting", xident);
return STATUS_FOUND;
}
@@ -965,7 +990,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
SPINLOCK masterLock;
bool found;
LOCKMETHODTABLE *lockMethodTable;
XIDLookupEnt *result,
XIDLookupEnt *xident,
item;
HTAB *xidTable;
TransactionId xid;
@@ -1053,9 +1078,9 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* Find an xid entry with this tag
*/
xidTable = lockMethodTable->xidHash;
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_FIND_SAVE, &found);
if (!result || !found)
if (!xident || !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
@@ -1066,23 +1091,23 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
elog(NOTICE, "LockRelease: xid table corrupted");
return FALSE;
}
XID_PRINT("LockRelease: found", result);
Assert(result->tag.lock == MAKE_OFFSET(lock));
XID_PRINT("LockRelease: found", xident);
Assert(xident->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want to
* release.
*/
if (!(result->holders[lockmode] > 0))
if (!(xident->holders[lockmode] > 0))
{
SpinRelease(masterLock);
XID_PRINT("LockAcquire: WRONGTYPE", result);
XID_PRINT("LockAcquire: WRONGTYPE", xident);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
Assert(result->holders[lockmode] >= 0);
Assert(xident->holders[lockmode] >= 0);
return FALSE;
}
Assert(result->nHolding > 0);
Assert(xident->nHolding > 0);
/*
* fix the general lock stats
@@ -1147,27 +1172,27 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* now check to see if I have any private locks. If I do, decrement
* the counts associated with them.
*/
result->holders[lockmode]--;
result->nHolding--;
XID_PRINT("LockRelease: updated", result);
Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
xident->holders[lockmode]--;
xident->nHolding--;
XID_PRINT("LockRelease: updated", xident);
Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
/*
* If this was my last hold on this lock, delete my entry in the XID
* table.
*/
if (!result->nHolding)
if (!xident->nHolding)
{
if (result->queue.prev == INVALID_OFFSET)
if (xident->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
if (result->queue.next == INVALID_OFFSET)
if (xident->queue.next == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue);
XID_PRINT("LockRelease: deleting", result);
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
if (xident->queue.next != INVALID_OFFSET)
SHMQueueDelete(&xident->queue);
XID_PRINT("LockRelease: deleting", xident);
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &xident,
HASH_REMOVE_SAVED, &found);
if (!result || !found)
if (!xident || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockRelease: remove xid, table corrupted");
@@ -1196,7 +1221,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
XIDLookupEnt *result;
XIDLookupEnt *xident;
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
@@ -1371,11 +1396,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
*/
XID_PRINT("LockReleaseAll: deleting", xidLook);
result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
xident = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
(Pointer) xidLook,
HASH_REMOVE,
&found);
if (!result || !found)
if (!xident || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: xid table corrupted");

View File

@@ -8,17 +8,17 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.78 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/file.h>
#include "postgres.h"
#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/smgr.h"
@@ -123,63 +123,39 @@ mdinit()
int
mdcreate(Relation reln)
{
char *path;
int fd,
vfd;
char *path;
Assert(reln->rd_unlinked && reln->rd_fd < 0);
Assert(reln->rd_fd < 0);
path = relpath(reln->rd_node);
fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
/*
* For cataloged relations, pg_class is guaranteed to have a unique
* record with the same relname by the unique index. So we are able to
* reuse existent files for new cataloged relations. Currently we reuse
* them in the following cases. 1. they are empty. 2. they are used
* for Index relations and their size == BLCKSZ * 2.
*
* During bootstrap processing, we skip that check, because pg_time,
* pg_variable, and pg_log get created before their .bki file entries
* are processed.
*/
fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
if (fd < 0)
{
int save_errno = errno;
if (!IsBootstrapProcessingMode() &&
reln->rd_rel->relkind == RELKIND_UNCATALOGED)
return -1;
fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
/*
* During bootstrap, there are cases where a system relation will be
* accessed (by internal backend processes) before the bootstrap
* script nominally creates it. Therefore, allow the file to exist
* already, but in bootstrap mode only. (See also mdopen)
*/
if (IsBootstrapProcessingMode())
fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
if (fd < 0)
{
pfree(path);
/* be sure to return the error reported by create, not open */
errno = save_errno;
return -1;
}
if (!IsBootstrapProcessingMode())
{
bool reuse = false;
long len = FileSeek(fd, 0L, SEEK_END);
if (len == 0)
reuse = true;
else if (reln->rd_rel->relkind == RELKIND_INDEX &&
len == BLCKSZ * 2)
reuse = true;
if (!reuse)
{
FileClose(fd);
/* be sure to return the error reported by create */
errno = save_errno;
return -1;
}
}
errno = 0;
}
reln->rd_unlinked = false;
pfree(path);
vfd = _fdvec_alloc();
if (vfd < 0)
@@ -187,12 +163,10 @@ mdcreate(Relation reln)
Md_fdvec[vfd].mdfd_vfd = fd;
Md_fdvec[vfd].mdfd_flags = (uint16) 0;
Md_fdvec[vfd].mdfd_lstbcnt = 0;
#ifndef LET_OS_MANAGE_FILESIZE
Md_fdvec[vfd].mdfd_chain = (MdfdVec *) NULL;
#endif
Md_fdvec[vfd].mdfd_lstbcnt = 0;
pfree(path);
return vfd;
}
@@ -201,65 +175,50 @@ mdcreate(Relation reln)
* mdunlink() -- Unlink a relation.
*/
int
mdunlink(Relation reln)
mdunlink(RelFileNode rnode)
{
int nblocks;
int fd;
MdfdVec *v;
int status = SM_SUCCESS;
int save_errno = 0;
char *path;
/*
* If the relation is already unlinked,we have nothing to do any more.
*/
if (reln->rd_unlinked && reln->rd_fd < 0)
return SM_SUCCESS;
path = relpath(rnode);
/*
* Force all segments of the relation to be opened, so that we won't
* miss deleting any of them.
*/
nblocks = mdnblocks(reln);
/*
* Clean out the mdfd vector, letting fd.c unlink the physical files.
*
* NOTE: We truncate the file(s) before deleting 'em, because if other
* backends are holding the files open, the unlink will fail on some
* platforms (think Microsoft). Better a zero-size file gets left
* around than a big file. Those other backends will be forced to
* close the relation by cache invalidation, but that probably hasn't
* happened yet.
*/
fd = RelationGetFile(reln);
if (fd < 0) /* should not happen */
elog(ERROR, "mdunlink: mdnblocks didn't open relation");
Md_fdvec[fd].mdfd_flags = (uint16) 0;
/* Delete the first segment, or only segment if not doing segmenting */
if (unlink(path) < 0)
{
status = SM_FAIL;
save_errno = errno;
}
#ifndef LET_OS_MANAGE_FILESIZE
for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;)
/* Get the additional segments, if any */
if (status == SM_SUCCESS)
{
MdfdVec *ov = v;
char *segpath = (char *) palloc(strlen(path) + 12);
int segno;
FileTruncate(v->mdfd_vfd, 0);
FileUnlink(v->mdfd_vfd);
v = v->mdfd_chain;
if (ov != &Md_fdvec[fd])
pfree(ov);
for (segno = 1; ; segno++)
{
sprintf(segpath, "%s.%d", path, segno);
if (unlink(segpath) < 0)
{
/* ENOENT is expected after the last segment... */
if (errno != ENOENT)
{
status = SM_FAIL;
save_errno = errno;
}
break;
}
}
pfree(segpath);
}
Md_fdvec[fd].mdfd_chain = (MdfdVec *) NULL;
#else
v = &Md_fdvec[fd];
FileTruncate(v->mdfd_vfd, 0);
FileUnlink(v->mdfd_vfd);
#endif
_fdvec_free(fd);
pfree(path);
/* be sure to mark relation closed && unlinked */
reln->rd_fd = -1;
reln->rd_unlinked = true;
return SM_SUCCESS;
errno = save_errno;
return status;
}
/*
@@ -327,24 +286,29 @@ mdopen(Relation reln)
int vfd;
Assert(reln->rd_fd < 0);
path = relpath(reln->rd_node);
fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
if (fd < 0)
{
/* in bootstrap mode, accept mdopen as substitute for mdcreate */
/*
* During bootstrap, there are cases where a system relation will be
* accessed (by internal backend processes) before the bootstrap
* script nominally creates it. Therefore, accept mdopen() as a
* substitute for mdcreate() in bootstrap mode only. (See mdcreate)
*/
if (IsBootstrapProcessingMode())
fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
if (fd < 0)
{
elog(NOTICE, "mdopen: couldn't open %s: %m", path);
/* mark relation closed and unlinked */
reln->rd_fd = -1;
reln->rd_unlinked = true;
pfree(path);
return -1;
}
}
reln->rd_unlinked = false;
pfree(path);
vfd = _fdvec_alloc();
if (vfd < 0)
@@ -362,8 +326,6 @@ mdopen(Relation reln)
#endif
#endif
pfree(path);
return vfd;
}

View File

@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.19 2000/04/10 23:41:51 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/Attic/mm.c,v 1.20 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -204,9 +204,11 @@ mmcreate(Relation reln)
/*
* mmunlink() -- Unlink a relation.
*
* XXX currently broken: needs to accept RelFileNode, not Relation
*/
int
mmunlink(Relation reln)
mmunlink(RelFileNode rnode)
{
int i;
Oid reldbid;

View File

@@ -11,13 +11,16 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.43 2000/11/08 22:10:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
static void smgrshutdown(void);
@@ -26,7 +29,7 @@ typedef struct f_smgr
int (*smgr_init) (void); /* may be NULL */
int (*smgr_shutdown) (void); /* may be NULL */
int (*smgr_create) (Relation reln);
int (*smgr_unlink) (Relation reln);
int (*smgr_unlink) (RelFileNode rnode);
int (*smgr_extend) (Relation reln, char *buffer);
int (*smgr_open) (Relation reln);
int (*smgr_close) (Relation reln);
@@ -60,10 +63,11 @@ static f_smgr smgrsw[] = {
{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
#ifdef XLOG
mdnblocks, mdtruncate, mdcommit, mdabort, mdsync},
mdnblocks, mdtruncate, mdcommit, mdabort, mdsync
#else
mdnblocks, mdtruncate, mdcommit, mdabort},
mdnblocks, mdtruncate, mdcommit, mdabort
#endif
},
#ifdef STABLE_MEMORY_STORAGE
/* main memory */
@@ -93,6 +97,31 @@ static bool smgrwo[] = {
static int NSmgr = lengthof(smgrsw);
/*
* We keep a list of all relations (represented as RelFileNode values)
* that have been created or deleted in the current transaction. When
* a relation is created, we create the physical file immediately, but
* remember it so that we can delete the file again if the current
* transaction is aborted. Conversely, a deletion request is NOT
* executed immediately, but is just entered in the list. When and if
* the transaction commits, we can delete the physical file.
*
* NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
* unbetimes. It'd probably be OK to keep it in TopTransactionContext,
* but I'm being paranoid.
*/
typedef struct PendingRelDelete
{
RelFileNode relnode; /* relation that may need to be deleted */
int16 which; /* which storage manager? */
bool atCommit; /* T=delete at commit; F=delete at abort */
struct PendingRelDelete *next; /* linked-list link */
} PendingRelDelete;
static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
/*
* smgrinit(), smgrshutdown() -- Initialize or shut down all storage
* managers.
@@ -147,27 +176,58 @@ int
smgrcreate(int16 which, Relation reln)
{
int fd;
PendingRelDelete *pending;
if ((fd = (*(smgrsw[which].smgr_create)) (reln)) < 0)
elog(ERROR, "cannot create %s: %m", RelationGetRelationName(reln));
/* Add the relation to the list of stuff to delete at abort */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = reln->rd_node;
pending->which = which;
pending->atCommit = false; /* delete if abort */
pending->next = pendingDeletes;
pendingDeletes = pending;
return fd;
}
/*
* smgrunlink() -- Unlink a relation.
*
* The relation is removed from the store.
* The relation is removed from the store. Actually, we just remember
* that we want to do this at transaction commit.
*/
int
smgrunlink(int16 which, Relation reln)
{
int status;
PendingRelDelete *pending;
if ((status = (*(smgrsw[which].smgr_unlink)) (reln)) == SM_FAIL)
elog(ERROR, "cannot unlink %s: %m", RelationGetRelationName(reln));
/* Make sure the file is closed */
if (reln->rd_fd >= 0)
smgrclose(which, reln);
return status;
/* Add the relation to the list of stuff to delete at commit */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = reln->rd_node;
pending->which = which;
pending->atCommit = true; /* delete if commit */
pending->next = pendingDeletes;
pendingDeletes = pending;
/*
* NOTE: if the relation was created in this transaction, it will now
* be present in the pending-delete list twice, once with atCommit true
* and once with atCommit false. Hence, it will be physically deleted
* at end of xact in either case (and the other entry will be ignored
* by smgrDoPendingDeletes, so no error will occur). We could instead
* remove the existing list entry and delete the physical file
* immediately, but for now I'll keep the logic simple.
*/
return SM_SUCCESS;
}
/*
@@ -193,17 +253,18 @@ smgrextend(int16 which, Relation reln, char *buffer)
/*
* smgropen() -- Open a relation using a particular storage manager.
*
* Returns the fd for the open relation on success, aborts the
* transaction on failure.
* Returns the fd for the open relation on success.
*
* On failure, returns -1 if failOK, else aborts the transaction.
*/
int
smgropen(int16 which, Relation reln)
smgropen(int16 which, Relation reln, bool failOK)
{
int fd;
if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0 &&
!reln->rd_unlinked)
elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln));
if ((fd = (*(smgrsw[which].smgr_open)) (reln)) < 0)
if (! failOK)
elog(ERROR, "cannot open %s: %m", RelationGetRelationName(reln));
return fd;
}
@@ -211,12 +272,6 @@ smgropen(int16 which, Relation reln)
/*
* smgrclose() -- Close a relation.
*
* NOTE: underlying manager should allow case where relation is
* already closed. Indeed relation may have been unlinked!
* This is currently called only from RelationFlushRelation() when
* the relation cache entry is about to be dropped; could be doing
* simple relation cache clear, or finishing up DROP TABLE.
*
* Returns SM_SUCCESS on success, aborts on failure.
*/
int
@@ -411,6 +466,41 @@ smgrtruncate(int16 which, Relation reln, int nblocks)
return newblks;
}
/*
* smgrDoPendingDeletes() -- take care of relation deletes at end of xact.
*/
int
smgrDoPendingDeletes(bool isCommit)
{
while (pendingDeletes != NULL)
{
PendingRelDelete *pending = pendingDeletes;
pendingDeletes = pending->next;
if (pending->atCommit == isCommit)
{
/*
* Get rid of any leftover buffers for the rel (shouldn't be
* any in the commit case, but there can be in the abort case).
*/
DropRelFileNodeBuffers(pending->relnode);
/*
* And delete the physical files.
*
* Note: we treat deletion failure as a NOTICE, not an error,
* because we've already decided to commit or abort the current
* xact.
*/
if ((*(smgrsw[pending->which].smgr_unlink)) (pending->relnode) == SM_FAIL)
elog(NOTICE, "cannot unlink %u/%u: %m",
pending->relnode.tblNode, pending->relnode.relNode);
}
pfree(pending);
}
return SM_SUCCESS;
}
/*
* smgrcommit(), smgrabort() -- Commit or abort changes made during the
* current transaction.