1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Nested transactions. There is still much left to do, especially on the

performance front, but with feature freeze upon us I think it's time to
drive a stake in the ground and say that this will be in 7.5.

Alvaro Herrera, with some help from Tom Lane.
This commit is contained in:
Tom Lane
2004-07-01 00:52:04 +00:00
parent 4c9aa572fa
commit 573a71a5da
74 changed files with 4516 additions and 1144 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.171 2004/06/18 06:13:33 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.172 2004/07/01 00:50:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -45,6 +45,7 @@
#include "storage/bufpage.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "pgstat.h"
@@ -64,9 +65,13 @@ long NDirectFileRead; /* some I/O's are direct file access.
* bypass bufmgr */
long NDirectFileWrite; /* e.g., I/O in psort and hashjoin. */
/* List of upper-level-transaction buffer refcount arrays */
static List *upperRefCounts = NIL;
static void PinBuffer(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
static void BufferFixLeak(Buffer bufnum, int32 shouldBe, bool emitWarning);
static void WaitIO(BufferDesc *buf);
static void StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, int err_flag);
@@ -826,30 +831,104 @@ AtEOXact_Buffers(bool isCommit)
for (i = 0; i < NBuffers; i++)
{
if (PrivateRefCount[i] != 0)
{
BufferDesc *buf = &(BufferDescriptors[i]);
if (isCommit)
elog(WARNING,
"buffer refcount leak: [%03d] "
"(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
i,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
PrivateRefCount[i] = 1; /* make sure we release shared pin */
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(buf);
LWLockRelease(BufMgrLock);
Assert(PrivateRefCount[i] == 0);
}
BufferFixLeak(i, 0, isCommit);
}
AtEOXact_LocalBuffers(isCommit);
}
/*
* During subtransaction start, save buffer reference counts.
*/
void
AtSubStart_Buffers(void)
{
int32 *copyRefCounts;
Size rcSize;
MemoryContext old_cxt;
/* this is probably the active context already, but be safe */
old_cxt = MemoryContextSwitchTo(CurTransactionContext);
/*
* We need to copy the current state of PrivateRefCount[]. In the typical
* scenario, few if any of the entries will be nonzero, and we could save
* space by storing only the nonzero ones. However, copying the whole
* thing is lots simpler and faster both here and in AtEOSubXact_Buffers,
* so it seems best to waste the space.
*/
rcSize = NBuffers * sizeof(int32);
copyRefCounts = (int32 *) palloc(rcSize);
memcpy(copyRefCounts, PrivateRefCount, rcSize);
/* Attach to list */
upperRefCounts = lcons(copyRefCounts, upperRefCounts);
MemoryContextSwitchTo(old_cxt);
}
/*
* AtEOSubXact_Buffers
*
* At subtransaction end, we restore the saved counts. If committing, we
* complain if the refcounts don't match; if aborting, just restore silently.
*/
void
AtEOSubXact_Buffers(bool isCommit)
{
int32 *oldRefCounts;
int i;
oldRefCounts = (int32 *) linitial(upperRefCounts);
upperRefCounts = list_delete_first(upperRefCounts);
for (i = 0; i < NBuffers; i++)
{
if (PrivateRefCount[i] != oldRefCounts[i])
BufferFixLeak(i, oldRefCounts[i], isCommit);
}
pfree(oldRefCounts);
}
/*
* Fix a buffer refcount leak.
*
* The caller does not hold the BufMgrLock.
*/
static void
BufferFixLeak(Buffer bufnum, int32 shouldBe, bool emitWarning)
{
BufferDesc *buf = &(BufferDescriptors[bufnum]);
if (emitWarning)
elog(WARNING,
"buffer refcount leak: [%03d] (rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d, should be=%d)",
bufnum,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[bufnum], shouldBe);
/* If it's less, we're in a heap o' trouble */
if (PrivateRefCount[bufnum] <= shouldBe)
elog(FATAL, "buffer refcount was decreased by subtransaction");
if (shouldBe > 0)
{
/* We still keep the shared-memory pin */
PrivateRefCount[bufnum] = shouldBe;
}
else
{
PrivateRefCount[bufnum] = 1; /* make sure we release shared pin */
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(buf);
LWLockRelease(BufMgrLock);
Assert(PrivateRefCount[bufnum] == 0);
}
}
/*
* FlushBufferPool
*

View File

@@ -8,16 +8,16 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.68 2004/05/29 22:48:20 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.69 2004/07/01 00:50:52 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/clog.h"
#include "access/subtrans.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
@@ -70,6 +70,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate,
size += LockShmemSize(maxBackends);
size += XLOGShmemSize();
size += CLOGShmemSize();
size += SUBTRANSShmemSize();
size += LWLockShmemSize();
size += SInvalShmemSize(maxBackends);
size += FreeSpaceShmemSize();
@@ -133,6 +134,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate,
*/
XLOGShmemInit();
CLOGShmemInit();
SUBTRANSShmemInit();
InitBufferPool();
/*

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.64 2004/06/02 21:29:28 momjian Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.65 2004/07/01 00:50:52 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,6 +16,8 @@
#include <signal.h>
#include "access/subtrans.h"
#include "access/transam.h"
#include "commands/async.h"
#include "storage/ipc.h"
#include "storage/proc.h"
@@ -428,20 +430,40 @@ DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
/*
* TransactionIdIsInProgress -- is given transaction running by some backend
*
* There are three possibilities for finding a running transaction:
*
* 1. the given Xid is a main transaction Id. We will find this out cheaply
* by looking at the PGPROC struct for each backend.
*
* 2. the given Xid is one of the cached subxact Xids in the PGPROC array.
* We can find this out cheaply too.
*
* 3. Search the SubTrans tree. This is the slowest, but sadly it has to be
* done always if the other two failed.
*
* SInvalLock has to be held while we do 1 and 2. If we save all the Xids
* while doing 1, we can release the SInvalLock while we do 3. This buys back
* some concurrency (we can't retrieve the main Xids from PGPROC again anyway,
* see GetNewTransactionId)
*/
bool
TransactionIdIsInProgress(TransactionId xid)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int i;
int nxids = 0;
TransactionId *xids;
xids = (TransactionId *)palloc(sizeof(TransactionId) * segP->maxBackends);
LWLockAcquire(SInvalLock, LW_SHARED);
for (index = 0; index < segP->lastBackend; index++)
for (i = 0; i < segP->lastBackend; i++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
SHMEM_OFFSET pOffset = stateP[i].procStruct;
if (pOffset != INVALID_OFFSET)
{
@@ -450,16 +472,71 @@ TransactionIdIsInProgress(TransactionId xid)
/* Fetch xid just once - see GetNewTransactionId */
TransactionId pxid = proc->xid;
/*
* check the main Xid (step 1 above)
*/
if (TransactionIdEquals(pxid, xid))
{
result = true;
break;
}
/*
* save the main Xid for step 3.
*/
xids[nxids++] = pxid;
#ifdef NOT_USED
FIXME -- waiting to save the Xids in PGPROC ...
/*
* check the saved Xids array (step 2)
*/
for (j = 0; j < PGPROC_MAX_SAVED_XIDS; j++)
{
pxid = proc->savedxids[j];
if (!TransactionIdIsValid(pxids))
break;
if (TransactionIdEquals(pxid, xid))
{
result = true;
break;
}
}
#endif
if (result)
break;
}
}
LWLockRelease(SInvalLock);
/*
* Step 3: have to check pg_subtrans. Use the saved Xids.
*
* XXX Could save the cached Xids too for further improvement.
*/
if (!result)
{
/* this is a potentially expensive call. */
xid = SubTransGetTopmostTransaction(xid);
Assert(TransactionIdIsValid(xid));
/*
* We don't care if it aborted, because if it did, we won't find
* it in the array.
*/
for (i = 0; i < nxids; i++)
if (TransactionIdEquals(xids[i], xid))
return true;
}
return result;
}
@@ -596,7 +673,7 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
* This does open a possibility for avoiding repeated malloc/free:
* since MaxBackends does not change at runtime, we can simply reuse
* the previous xip array if any. (This relies on the fact that all
* calls pass static SnapshotData structs.)
* callers pass static SnapshotData structs.)
*/
if (snapshot->xip == NULL)
{

View File

@@ -8,13 +8,14 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.63 2004/05/28 05:13:04 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.64 2004/07/01 00:50:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
@@ -333,19 +334,21 @@ XactLockTableInsert(TransactionId xid)
* XactLockTableWait
*
* Wait for the specified transaction to commit or abort.
* We actually wait on the topmost transaction of the transaction tree.
*/
void
XactLockTableWait(TransactionId xid)
{
LOCKTAG tag;
TransactionId myxid = GetCurrentTransactionId();
TransactionId waitXid = SubTransGetTopmostTransaction(xid);
Assert(!TransactionIdEquals(xid, myxid));
Assert(!SubTransXidsHaveCommonAncestor(waitXid, myxid));
MemSet(&tag, 0, sizeof(tag));
tag.relId = XactLockTableId;
tag.dbId = InvalidOid;
tag.objId.xid = xid;
tag.objId.xid = waitXid;
if (!LockAcquire(LockTableId, &tag, myxid,
ShareLock, false))
@@ -355,8 +358,13 @@ XactLockTableWait(TransactionId xid)
/*
* Transaction was committed/aborted/crashed - we have to update
* pg_clog if transaction is still marked as running.
* pg_clog if transaction is still marked as running. If it's a
* subtransaction, we can update the parent status too.
*/
if (!TransactionIdDidCommit(xid) && !TransactionIdDidAbort(xid))
TransactionIdAbort(xid);
if (!TransactionIdDidCommit(waitXid) && !TransactionIdDidAbort(waitXid))
{
TransactionIdAbort(waitXid);
if (waitXid != xid)
TransactionIdAbort(xid);
}
}

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.133 2004/06/05 19:48:08 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.134 2004/07/01 00:50:59 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -23,7 +23,7 @@
* Interface:
*
* LockAcquire(), LockRelease(), LockMethodTableInit(),
* LockMethodTableRename(), LockReleaseAll,
* LockMethodTableRename(), LockReleaseAll(),
* LockCheckConflicts(), GrantLock()
*
*-------------------------------------------------------------------------
@@ -1129,19 +1129,25 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
/*
* LockReleaseAll -- Release all locks in a process's lock list.
* LockReleaseAll -- Release all locks of the specified lock method that
* are held by the specified process.
*
* Well, not really *all* locks.
* Well, not necessarily *all* locks. The available behaviors are:
*
* If 'allxids' is TRUE, all locks of the specified lock method are
* released, regardless of transaction affiliation.
* which == ReleaseAll: release all locks regardless of transaction
* affiliation.
*
* If 'allxids' is FALSE, all locks of the specified lock method and
* specified XID are released.
* which == ReleaseAllExceptSession: release all locks with Xid != 0
* (zero is the Xid used for "session" locks).
*
* which == ReleaseGivenXids: release only locks whose Xids appear in
* the xids[] array (of length nxids).
*
* xids/nxids are ignored when which != ReleaseGivenXids.
*/
bool
LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
bool allxids, TransactionId xid)
LockReleaseWhich which, int nxids, TransactionId *xids)
{
SHM_QUEUE *procHolders = &(proc->procHolders);
PROCLOCK *proclock;
@@ -1190,8 +1196,25 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
goto next_item;
/* If not allxids, ignore items that are of the wrong xid */
if (!allxids && !TransactionIdEquals(xid, proclock->tag.xid))
if (which == ReleaseGivenXids)
{
/* Ignore locks with an Xid not in the list */
bool release = false;
for (i = 0; i < nxids; i++)
{
if (TransactionIdEquals(proclock->tag.xid, xids[i]))
{
release = true;
break;
}
}
if (!release)
goto next_item;
}
/* Ignore locks with Xid=0 unless we are asked to release All locks */
else if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
&& which != ReleaseAll)
goto next_item;
PROCLOCK_PRINT("LockReleaseAll", proclock);

View File

@@ -15,13 +15,14 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.20 2004/06/11 16:43:24 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.21 2004/07/01 00:50:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/clog.h"
#include "access/subtrans.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/spin.h"
@@ -111,6 +112,9 @@ NumLWLocks(void)
/* clog.c needs one per CLOG buffer + one control lock */
numLocks += NUM_CLOG_BUFFERS + 1;
/* subtrans.c needs one per SubTrans buffer + one control lock */
numLocks += NUM_SUBTRANS_BUFFERS + 1;
/* Perhaps create a few more for use by user-defined modules? */
return numLocks;

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.148 2004/05/29 22:48:20 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.149 2004/07/01 00:50:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -380,26 +380,34 @@ LockWaitCancel(void)
/*
* ProcReleaseLocks() -- release locks associated with current transaction
* at transaction commit or abort
* at main transaction and subtransaction commit or abort
*
* At commit, we release only locks tagged with the current transaction's XID,
* leaving those marked with XID 0 (ie, session locks) undisturbed. At abort,
* we release all locks including XID 0, because we need to clean up after
* a failure. This logic will need extension if we ever support nested
* transactions.
* The options for which locks to release are the same as for the underlying
* LockReleaseAll() function.
*
* Note that user locks are not released in either case.
* Notes:
*
* At main transaction commit, we release all locks except session locks.
* At main transaction abort, we release all locks including session locks;
* this lets us clean up after a VACUUM FULL failure.
*
* At subtransaction commit, we don't release any locks (so this func is not
* called at all); we will defer the releasing to the parent transaction.
* At subtransaction abort, we release all locks held by the subtransaction;
* this is implemented by passing in the Xids of the failed subxact and its
* children in the xids[] array.
*
* Note that user locks are not released in any case.
*/
void
ProcReleaseLocks(bool isCommit)
ProcReleaseLocks(LockReleaseWhich which, int nxids, TransactionId *xids)
{
if (!MyProc)
return;
/* If waiting, get off wait queue (should only be needed after error) */
LockWaitCancel();
/* Release locks */
LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc,
!isCommit, GetCurrentTransactionId());
LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, which, nxids, xids);
}
@@ -432,11 +440,11 @@ ProcKill(int code, Datum arg)
LockWaitCancel();
/* Remove from the standard lock table */
LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, true, InvalidTransactionId);
LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, ReleaseAll, 0, NULL);
#ifdef USER_LOCKS
/* Remove from the user lock table */
LockReleaseAll(USER_LOCKMETHOD, MyProc, true, InvalidTransactionId);
LockReleaseAll(USER_LOCKMETHOD, MyProc, ReleaseAll, 0, NULL);
#endif
SpinLockAcquire(ProcStructLock);

View File

@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.74 2004/06/18 06:13:37 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.75 2004/07/01 00:51:07 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -80,9 +80,10 @@ static HTAB *SMgrRelationHash = NULL;
* executed immediately, but is just entered in the list. When and if
* the transaction commits, we can delete the physical file.
*
* NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
* unbetimes. It'd probably be OK to keep it in TopTransactionContext,
* but I'm being paranoid.
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
* successful subtransactions attach their lists to their parent's list.
* Failed subtransactions can immediately execute the abort-time actions.
*/
typedef struct PendingRelDelete
@@ -91,10 +92,11 @@ typedef struct PendingRelDelete
int which; /* which storage manager? */
bool isTemp; /* is it a temporary relation? */
bool atCommit; /* T=delete at commit; F=delete at abort */
struct PendingRelDelete *next; /* linked-list link */
} PendingRelDelete;
static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
static List *pendingDeletes = NIL; /* head of linked list */
static List *upperPendingDeletes = NIL; /* list of upper-xact lists */
/*
@@ -305,6 +307,7 @@ smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo)
XLogRecData rdata;
xl_smgr_create xlrec;
PendingRelDelete *pending;
MemoryContext old_cxt;
if (! (*(smgrsw[reln->smgr_which].smgr_create)) (reln, isRedo))
ereport(ERROR,
@@ -332,14 +335,17 @@ smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo)
lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLOG_NO_TRAN, &rdata);
/* Add the relation to the list of stuff to delete at abort */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
old_cxt = MemoryContextSwitchTo(CurTransactionContext);
pending = (PendingRelDelete *) palloc(sizeof(PendingRelDelete));
pending->relnode = reln->smgr_rnode;
pending->which = reln->smgr_which;
pending->isTemp = isTemp;
pending->atCommit = false; /* delete if abort */
pending->next = pendingDeletes;
pendingDeletes = pending;
pendingDeletes = lcons(pending, pendingDeletes);
MemoryContextSwitchTo(old_cxt);
}
/*
@@ -354,16 +360,20 @@ void
smgrscheduleunlink(SMgrRelation reln, bool isTemp)
{
PendingRelDelete *pending;
MemoryContext old_cxt;
/* Add the relation to the list of stuff to delete at commit */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
old_cxt = MemoryContextSwitchTo(CurTransactionContext);
pending = (PendingRelDelete *) palloc(sizeof(PendingRelDelete));
pending->relnode = reln->smgr_rnode;
pending->which = reln->smgr_which;
pending->isTemp = isTemp;
pending->atCommit = true; /* delete if commit */
pending->next = pendingDeletes;
pendingDeletes = pending;
pendingDeletes = lcons(pending, pendingDeletes);
MemoryContextSwitchTo(old_cxt);
/*
* NOTE: if the relation was created in this transaction, it will now
@@ -627,18 +637,21 @@ smgrimmedsync(SMgrRelation reln)
void
smgrDoPendingDeletes(bool isCommit)
{
while (pendingDeletes != NULL)
{
PendingRelDelete *pending = pendingDeletes;
ListCell *p;
foreach(p, pendingDeletes)
{
PendingRelDelete *pending = lfirst(p);
pendingDeletes = pending->next;
if (pending->atCommit == isCommit)
smgr_internal_unlink(pending->relnode,
pending->which,
pending->isTemp,
false);
pfree(pending);
}
/* We needn't free the cells since they are in CurTransactionContext */
pendingDeletes = NIL;
}
/*
@@ -647,17 +660,22 @@ smgrDoPendingDeletes(bool isCommit)
* The return value is the number of relations scheduled for termination.
* *ptr is set to point to a freshly-palloc'd array of RelFileNodes.
* If there are no relations to be deleted, *ptr is set to NULL.
*
* Note that the list does not include anything scheduled for termination
* by upper-level transactions.
*/
int
smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
{
int nrels;
RelFileNode *rptr;
PendingRelDelete *pending;
ListCell *p;
nrels = 0;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
foreach(p, pendingDeletes)
{
PendingRelDelete *pending = lfirst(p);
if (pending->atCommit == forCommit)
nrels++;
}
@@ -668,14 +686,69 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
}
rptr = (RelFileNode *) palloc(nrels * sizeof(RelFileNode));
*ptr = rptr;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
foreach(p, pendingDeletes)
{
PendingRelDelete *pending = lfirst(p);
if (pending->atCommit == forCommit)
*rptr++ = pending->relnode;
}
return nrels;
}
/*
* AtSubStart_smgr() --- Take care of subtransaction start.
*
* Push empty state for the new subtransaction.
*/
void
AtSubStart_smgr(void)
{
MemoryContext old_cxt;
/* Keep the list-of-lists in TopTransactionContext for simplicity */
old_cxt = MemoryContextSwitchTo(TopTransactionContext);
upperPendingDeletes = lcons(pendingDeletes, upperPendingDeletes);
pendingDeletes = NIL;
MemoryContextSwitchTo(old_cxt);
}
/*
* AtSubCommit_smgr() --- Take care of subtransaction commit.
*
* Reassign all items in the pending deletes list to the parent transaction.
*/
void
AtSubCommit_smgr(void)
{
List *parentPendingDeletes;
parentPendingDeletes = (List *) linitial(upperPendingDeletes);
upperPendingDeletes = list_delete_first(upperPendingDeletes);
pendingDeletes = list_concat(parentPendingDeletes, pendingDeletes);
}
/*
* AtSubAbort_smgr() --- Take care of subtransaction abort.
*
* Delete created relations and forget about deleted relations.
* We can execute these operations immediately because we know this
* subtransaction will not commit.
*/
void
AtSubAbort_smgr(void)
{
smgrDoPendingDeletes(false);
/* Must pop the stack, too */
pendingDeletes = (List *) linitial(upperPendingDeletes);
upperPendingDeletes = list_delete_first(upperPendingDeletes);
}
/*
* smgrcommit() -- Prepare to commit changes made during the current
* transaction.