1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-06 07:49:08 +03:00

Two-phase commit. Original patch by Heikki Linnakangas, with additional

hacking by Alvaro Herrera and Tom Lane.
This commit is contained in:
Tom Lane
2005-06-17 22:32:51 +00:00
parent 5495575903
commit d0a89683a3
61 changed files with 4454 additions and 439 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.155 2005/06/14 22:15:32 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.156 2005/06/17 22:32:45 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -33,6 +33,8 @@
#include <signal.h>
#include <unistd.h>
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/proc.h"
@@ -44,7 +46,15 @@
/* This configuration variable is used to set the lock table size */
int max_locks_per_xact; /* set by guc.c */
#define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
#define NLOCKENTS() (max_locks_per_xact * (MaxBackends + max_prepared_xacts))
/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
LOCKTAG locktag;
LOCKMODE lockmode;
} TwoPhaseLockRecord;
/*
@@ -168,8 +178,7 @@ static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
/*
* InitLocks -- Init the lock module. Create a private data
* structure for constructing conflict masks.
* InitLocks -- Init the lock module. Nothing to do here at present.
*/
void
InitLocks(void)
@@ -222,8 +231,7 @@ LockMethodInit(LockMethod lockMethodTable,
LOCKMETHODID
LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
int numModes,
int maxBackends)
int numModes)
{
LockMethod newLockMethod;
LOCKMETHODID lockmethodid;
@@ -239,7 +247,7 @@ LockMethodTableInit(const char *tabName,
numModes, MAX_LOCKMODES - 1);
/* Compute init/max size to request for lock hashtables */
max_table_size = NLOCKENTS(maxBackends);
max_table_size = NLOCKENTS();
init_table_size = max_table_size / 2;
/* Allocate a string for the shmem index table lookups. */
@@ -1418,10 +1426,10 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
while (proclock)
{
bool wakeupNeeded = false;
PROCLOCK *nextHolder;
PROCLOCK *nextplock;
/* Get link first, since we may unlink/delete this proclock */
nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
@@ -1474,7 +1482,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
next_item:
proclock = nextHolder;
proclock = nextplock;
}
LWLockRelease(masterLock);
@@ -1605,14 +1613,262 @@ LockReassignCurrentOwner(void)
}
/*
* AtPrepare_Locks
* Do the preparatory work for a PREPARE: make 2PC state file records
* for all locks currently held.
*
* User locks are non-transactional and are therefore ignored.
*
* There are some special cases that we error out on: we can't be holding
* any session locks (should be OK since only VACUUM uses those) and we
* can't be holding any locks on temporary objects (since that would mess
* up the current backend if it tries to exit before the prepared xact is
* committed).
*/
void
AtPrepare_Locks(void)
{
LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD;
HASH_SEQ_STATUS status;
LOCALLOCK *locallock;
/*
* We don't need to touch shared memory for this --- all the necessary
* state information is in the locallock table.
*/
hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
TwoPhaseLockRecord record;
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
int i;
/* Ignore items that are not of the lockmethod to be processed */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
/* Ignore it if we don't actually hold the lock */
if (locallock->nLocks <= 0)
continue;
/* Scan to verify there are no session locks */
for (i = locallock->numLockOwners - 1; i >= 0; i--)
{
/* elog not ereport since this should not happen */
if (lockOwners[i].owner == NULL)
elog(ERROR, "cannot PREPARE when session locks exist");
}
/* Can't handle it if the lock is on a temporary object */
if (locallock->isTempObject)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
/*
* Create a 2PC record.
*/
memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
record.lockmode = locallock->tag.mode;
RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
&record, sizeof(TwoPhaseLockRecord));
}
}
/*
* PostPrepare_Locks
* Clean up after successful PREPARE
*
* Here, we want to transfer ownership of our locks to a dummy PGPROC
* that's now associated with the prepared transaction, and we want to
* clean out the corresponding entries in the LOCALLOCK table.
*
* Note: by removing the LOCALLOCK entries, we are leaving dangling
* pointers in the transaction's resource owner. This is OK at the
* moment since resowner.c doesn't try to free locks retail at a toplevel
* transaction commit or abort. We could alternatively zero out nLocks
* and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
* but that probably costs more cycles.
*/
void
PostPrepare_Locks(TransactionId xid)
{
PGPROC *newproc = TwoPhaseGetDummyProc(xid);
LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD;
HASH_SEQ_STATUS status;
SHM_QUEUE *procLocks = &(MyProc->procLocks);
LWLockId masterLock;
LockMethod lockMethodTable;
int numLockModes;
LOCALLOCK *locallock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
LOCK *lock;
/* This is a critical section: any error means big trouble */
START_CRIT_SECTION();
lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
numLockModes = lockMethodTable->numLockModes;
masterLock = lockMethodTable->masterLock;
/*
* First we run through the locallock table and get rid of unwanted
* entries, then we scan the process's proclocks and transfer them
* to the target proc.
*
* We do this separately because we may have multiple locallock
* entries pointing to the same proclock, and we daren't end up with
* any dangling pointers.
*/
hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
if (locallock->proclock == NULL || locallock->lock == NULL)
{
/*
* We must've run out of shared memory while trying to set up
* this lock. Just forget the local entry.
*/
Assert(locallock->nLocks == 0);
RemoveLocalLock(locallock);
continue;
}
/* Ignore items that are not of the lockmethod to be removed */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
/* We already checked there are no session locks */
/* Mark the proclock to show we need to release this lockmode */
if (locallock->nLocks > 0)
locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
/* And remove the locallock hashtable entry */
RemoveLocalLock(locallock);
}
LWLockAcquire(masterLock, LW_EXCLUSIVE);
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, procLink));
while (proclock)
{
PROCLOCK *nextplock;
LOCKMASK holdMask;
PROCLOCK *newproclock;
/* Get link first, since we may unlink/delete this proclock */
nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
/* Ignore items that are not of the lockmethod to be removed */
if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
goto next_item;
PROCLOCK_PRINT("PostPrepare_Locks", proclock);
LOCK_PRINT("PostPrepare_Locks", lock, 0);
Assert(lock->nRequested >= 0);
Assert(lock->nGranted >= 0);
Assert(lock->nGranted <= lock->nRequested);
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
* Since there were no session locks, we should be releasing all locks
*/
if (proclock->releaseMask != proclock->holdMask)
elog(PANIC, "we seem to have dropped a bit somewhere");
holdMask = proclock->holdMask;
/*
* We cannot simply modify proclock->tag.proc to reassign ownership
* of the lock, because that's part of the hash key and the proclock
* would then be in the wrong hash chain. So, unlink and delete the
* old proclock; create a new one with the right contents; and link
* it into place. We do it in this order to be certain we won't
* run out of shared memory (the way dynahash.c works, the deleted
* object is certain to be available for reallocation).
*/
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
if (!hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "proclock table corrupted");
/*
* Create the hash key for the new proclock table.
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(newproc);
newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &proclocktag,
HASH_ENTER_NULL, &found);
if (!newproclock)
ereport(PANIC, /* should not happen */
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
/*
* If new, initialize the new entry
*/
if (!found)
{
newproclock->holdMask = 0;
newproclock->releaseMask = 0;
/* Add new proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
SHMQueueInsertBefore(&newproc->procLocks, &newproclock->procLink);
PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
}
else
{
PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
Assert((newproclock->holdMask & ~lock->grantMask) == 0);
}
/*
* Pass over the identified lock ownership.
*/
Assert((newproclock->holdMask & holdMask) == 0);
newproclock->holdMask |= holdMask;
next_item:
proclock = nextplock;
}
LWLockRelease(masterLock);
END_CRIT_SECTION();
}
/*
* Estimate shared-memory space used for lock tables
*/
int
LockShmemSize(int maxBackends)
LockShmemSize(void)
{
int size = 0;
long max_table_size = NLOCKENTS(maxBackends);
long max_table_size = NLOCKENTS();
/* lock method headers */
size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
@@ -1704,21 +1960,19 @@ GetLockmodeName(LOCKMODE mode)
#ifdef LOCK_DEBUG
/*
* Dump all locks in the MyProc->procLocks list.
* Dump all locks in the given proc's procLocks list.
*
* Must have already acquired the masterLock.
*/
void
DumpLocks(void)
DumpLocks(PGPROC *proc)
{
PGPROC *proc;
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int lockmethodid = DEFAULT_LOCKMETHOD;
LockMethod lockMethodTable;
proc = MyProc;
if (proc == NULL)
return;
@@ -1793,3 +2047,254 @@ DumpAllLocks(void)
}
#endif /* LOCK_DEBUG */
/*
* LOCK 2PC resource manager's routines
*/
/*
* Re-acquire a lock belonging to a transaction that was prepared.
*
* Because this function is run at db startup, re-acquiring the locks should
* never conflict with running transactions because there are none. We
* assume that the lock state represented by the stored 2PC files is legal.
*/
void
lock_twophase_recover(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
PGPROC *proc = TwoPhaseGetDummyProc(xid);
LOCKTAG *locktag;
LOCKMODE lockmode;
LOCKMETHODID lockmethodid;
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
LWLockId masterLock;
LockMethod lockMethodTable;
Assert(len == sizeof(TwoPhaseLockRecord));
locktag = &rec->locktag;
lockmode = rec->lockmode;
lockmethodid = locktag->locktag_lockmethodid;
Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
masterLock = lockMethodTable->masterLock;
LWLockAcquire(masterLock, LW_EXCLUSIVE);
/*
* Find or create a lock with this tag.
*/
lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) locktag,
HASH_ENTER_NULL, &found);
if (!lock)
{
LWLockRelease(masterLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You may need to increase max_locks_per_transaction.")));
}
/*
* if it's a new lock object, initialize it
*/
if (!found)
{
lock->grantMask = 0;
lock->waitMask = 0;
SHMQueueInit(&(lock->procLocks));
ProcQueueInit(&(lock->waitProcs));
lock->nRequested = 0;
lock->nGranted = 0;
MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
}
else
{
LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
}
/*
* Create the hash key for the proclock table.
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(proc);
/*
* Find or create a proclock entry with this tag
*/
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &proclocktag,
HASH_ENTER_NULL, &found);
if (!proclock)
{
/* Ooops, not enough shmem for the proclock */
if (lock->nRequested == 0)
{
/*
* There are no other requestors of this lock, so garbage-collect
* the lock object. We *must* do this to avoid a permanent leak
* of shared memory, because there won't be anything to cause
* anyone to release the lock object later.
*/
Assert(SHMQueueEmpty(&(lock->procLocks)));
if (!hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
}
LWLockRelease(masterLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You may need to increase max_locks_per_transaction.")));
}
/*
* If new, initialize the new entry
*/
if (!found)
{
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink);
PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
}
else
{
PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
Assert((proclock->holdMask & ~lock->grantMask) == 0);
}
/*
* lock->nRequested and lock->requested[] count the total number of
* requests, whether granted or waiting, so increment those
* immediately.
*/
lock->nRequested++;
lock->requested[lockmode]++;
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/*
* We shouldn't already hold the desired lock.
*/
if (proclock->holdMask & LOCKBIT_ON(lockmode))
elog(ERROR, "lock %s on object %u/%u/%u is already held",
lock_mode_names[lockmode],
lock->tag.locktag_field1, lock->tag.locktag_field2,
lock->tag.locktag_field3);
/*
* We ignore any possible conflicts and just grant ourselves the lock.
*/
GrantLock(lock, proclock, lockmode);
LWLockRelease(masterLock);
}
/*
* 2PC processing routine for COMMIT PREPARED case.
*
* Find and release the lock indicated by the 2PC record.
*/
void
lock_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
PGPROC *proc = TwoPhaseGetDummyProc(xid);
LOCKTAG *locktag;
LOCKMODE lockmode;
LOCKMETHODID lockmethodid;
PROCLOCKTAG proclocktag;
LOCK *lock;
PROCLOCK *proclock;
LWLockId masterLock;
LockMethod lockMethodTable;
bool wakeupNeeded;
Assert(len == sizeof(TwoPhaseLockRecord));
locktag = &rec->locktag;
lockmode = rec->lockmode;
lockmethodid = locktag->locktag_lockmethodid;
Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
masterLock = lockMethodTable->masterLock;
LWLockAcquire(masterLock, LW_EXCLUSIVE);
/*
* Re-find the lock object (it had better be there).
*/
lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) locktag,
HASH_FIND, NULL);
if (!lock)
elog(PANIC, "failed to re-find shared lock object");
/*
* Re-find the proclock object (ditto).
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(proc);
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &proclocktag,
HASH_FIND, NULL);
if (!proclock)
elog(PANIC, "failed to re-find shared proclock object");
/*
* Double-check that we are actually holding a lock of the type we
* want to release.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
LWLockRelease(masterLock);
elog(WARNING, "you don't own a lock of type %s",
lock_mode_names[lockmode]);
return;
}
/*
* Do the releasing. CleanUpLock will waken any now-wakable waiters.
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
LWLockRelease(masterLock);
}
/*
* 2PC processing routine for ROLLBACK PREPARED case.
*
* This is actually just the same as the COMMIT case.
*/
void
lock_twophase_postabort(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
lock_twophase_postcommit(xid, info, recdata, len);
}