|
|
|
|
@@ -159,7 +159,7 @@ typedef struct GlobalTransactionData
|
|
|
|
|
*/
|
|
|
|
|
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
|
|
|
|
|
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
|
|
|
|
|
TransactionId xid; /* The GXACT id */
|
|
|
|
|
FullTransactionId fxid; /* The GXACT full xid */
|
|
|
|
|
|
|
|
|
|
Oid owner; /* ID of user that executed the xact */
|
|
|
|
|
ProcNumber locking_backend; /* backend currently working on the xact */
|
|
|
|
|
@@ -197,6 +197,7 @@ static GlobalTransaction MyLockedGxact = NULL;
|
|
|
|
|
|
|
|
|
|
static bool twophaseExitRegistered = false;
|
|
|
|
|
|
|
|
|
|
static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
|
|
|
|
|
static void RecordTransactionCommitPrepared(TransactionId xid,
|
|
|
|
|
int nchildren,
|
|
|
|
|
TransactionId *children,
|
|
|
|
|
@@ -216,19 +217,19 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
|
|
|
|
|
int nstats,
|
|
|
|
|
xl_xact_stats_item *stats,
|
|
|
|
|
const char *gid);
|
|
|
|
|
static void ProcessRecords(char *bufptr, TransactionId xid,
|
|
|
|
|
static void ProcessRecords(char *bufptr, FullTransactionId fxid,
|
|
|
|
|
const TwoPhaseCallback callbacks[]);
|
|
|
|
|
static void RemoveGXact(GlobalTransaction gxact);
|
|
|
|
|
|
|
|
|
|
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
|
|
|
|
|
static char *ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
|
|
|
|
|
XLogRecPtr prepare_start_lsn,
|
|
|
|
|
bool fromdisk, bool setParent, bool setNextXid);
|
|
|
|
|
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
|
|
|
|
|
static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
|
|
|
|
|
const char *gid, TimestampTz prepared_at, Oid owner,
|
|
|
|
|
Oid databaseid);
|
|
|
|
|
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
|
|
|
|
|
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
|
|
|
|
|
static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
|
|
|
|
|
static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initialization of shared memory
|
|
|
|
|
@@ -356,7 +357,7 @@ PostPrepare_Twophase(void)
|
|
|
|
|
* Reserve the GID for the given transaction.
|
|
|
|
|
*/
|
|
|
|
|
GlobalTransaction
|
|
|
|
|
MarkAsPreparing(TransactionId xid, const char *gid,
|
|
|
|
|
MarkAsPreparing(FullTransactionId fxid, const char *gid,
|
|
|
|
|
TimestampTz prepared_at, Oid owner, Oid databaseid)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact;
|
|
|
|
|
@@ -407,7 +408,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
|
|
|
|
gxact = TwoPhaseState->freeGXacts;
|
|
|
|
|
TwoPhaseState->freeGXacts = gxact->next;
|
|
|
|
|
|
|
|
|
|
MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
|
|
|
|
|
MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
|
|
|
|
|
|
|
|
|
|
gxact->ondisk = false;
|
|
|
|
|
|
|
|
|
|
@@ -430,11 +431,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
|
|
|
|
* Note: This function should be called with appropriate locks held.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
|
|
|
|
|
TimestampTz prepared_at, Oid owner, Oid databaseid)
|
|
|
|
|
MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
|
|
|
|
|
const char *gid, TimestampTz prepared_at, Oid owner,
|
|
|
|
|
Oid databaseid)
|
|
|
|
|
{
|
|
|
|
|
PGPROC *proc;
|
|
|
|
|
int i;
|
|
|
|
|
TransactionId xid = XidFromFullTransactionId(fxid);
|
|
|
|
|
|
|
|
|
|
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
|
|
|
|
|
|
|
|
|
|
@@ -479,7 +482,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
|
|
|
|
|
proc->subxidStatus.count = 0;
|
|
|
|
|
|
|
|
|
|
gxact->prepared_at = prepared_at;
|
|
|
|
|
gxact->xid = xid;
|
|
|
|
|
gxact->fxid = fxid;
|
|
|
|
|
gxact->owner = owner;
|
|
|
|
|
gxact->locking_backend = MyProcNumber;
|
|
|
|
|
gxact->valid = false;
|
|
|
|
|
@@ -797,12 +800,12 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
|
|
|
|
|
* caller had better hold it.
|
|
|
|
|
*/
|
|
|
|
|
static GlobalTransaction
|
|
|
|
|
TwoPhaseGetGXact(TransactionId xid, bool lock_held)
|
|
|
|
|
TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction result = NULL;
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
static TransactionId cached_xid = InvalidTransactionId;
|
|
|
|
|
static FullTransactionId cached_fxid = {InvalidTransactionId};
|
|
|
|
|
static GlobalTransaction cached_gxact = NULL;
|
|
|
|
|
|
|
|
|
|
Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
|
|
|
|
|
@@ -811,7 +814,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
|
|
|
|
|
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
|
|
|
|
|
* repeatedly for the same XID. We can save work with a simple cache.
|
|
|
|
|
*/
|
|
|
|
|
if (xid == cached_xid)
|
|
|
|
|
if (FullTransactionIdEquals(fxid, cached_fxid))
|
|
|
|
|
return cached_gxact;
|
|
|
|
|
|
|
|
|
|
if (!lock_held)
|
|
|
|
|
@@ -821,7 +824,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
|
|
|
|
|
|
|
|
|
if (gxact->xid == xid)
|
|
|
|
|
if (FullTransactionIdEquals(gxact->fxid, fxid))
|
|
|
|
|
{
|
|
|
|
|
result = gxact;
|
|
|
|
|
break;
|
|
|
|
|
@@ -832,9 +835,10 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
|
|
|
|
|
LWLockRelease(TwoPhaseStateLock);
|
|
|
|
|
|
|
|
|
|
if (result == NULL) /* should not happen */
|
|
|
|
|
elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
|
|
|
|
|
elog(ERROR, "failed to find GlobalTransaction for xid %u",
|
|
|
|
|
XidFromFullTransactionId(fxid));
|
|
|
|
|
|
|
|
|
|
cached_xid = xid;
|
|
|
|
|
cached_fxid = fxid;
|
|
|
|
|
cached_gxact = result;
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
@@ -881,7 +885,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
|
|
|
|
|
*have_more = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
result = gxact->xid;
|
|
|
|
|
result = XidFromFullTransactionId(gxact->fxid);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -892,7 +896,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* TwoPhaseGetDummyProcNumber
|
|
|
|
|
* Get the dummy proc number for prepared transaction specified by XID
|
|
|
|
|
* Get the dummy proc number for prepared transaction
|
|
|
|
|
*
|
|
|
|
|
* Dummy proc numbers are similar to proc numbers of real backends. They
|
|
|
|
|
* start at MaxBackends, and are unique across all currently active real
|
|
|
|
|
@@ -900,24 +904,24 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
|
|
|
|
|
* TwoPhaseStateLock will not be taken, so the caller had better hold it.
|
|
|
|
|
*/
|
|
|
|
|
ProcNumber
|
|
|
|
|
TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
|
|
|
|
|
TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
|
|
|
|
|
|
|
|
|
|
return gxact->pgprocno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* TwoPhaseGetDummyProc
|
|
|
|
|
* Get the PGPROC that represents a prepared transaction specified by XID
|
|
|
|
|
* Get the PGPROC that represents a prepared transaction
|
|
|
|
|
*
|
|
|
|
|
* If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
|
|
|
|
|
* caller had better hold it.
|
|
|
|
|
*/
|
|
|
|
|
PGPROC *
|
|
|
|
|
TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
|
|
|
|
|
TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
|
|
|
|
|
|
|
|
|
|
return GetPGProcByNumber(gxact->pgprocno);
|
|
|
|
|
}
|
|
|
|
|
@@ -942,10 +946,8 @@ AdjustToFullTransactionId(TransactionId xid)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static inline int
|
|
|
|
|
TwoPhaseFilePath(char *path, TransactionId xid)
|
|
|
|
|
TwoPhaseFilePath(char *path, FullTransactionId fxid)
|
|
|
|
|
{
|
|
|
|
|
FullTransactionId fxid = AdjustToFullTransactionId(xid);
|
|
|
|
|
|
|
|
|
|
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
|
|
|
|
|
EpochFromFullTransactionId(fxid),
|
|
|
|
|
XidFromFullTransactionId(fxid));
|
|
|
|
|
@@ -1049,7 +1051,7 @@ void
|
|
|
|
|
StartPrepare(GlobalTransaction gxact)
|
|
|
|
|
{
|
|
|
|
|
PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
|
|
|
|
|
TransactionId xid = gxact->xid;
|
|
|
|
|
TransactionId xid = XidFromFullTransactionId(gxact->fxid);
|
|
|
|
|
TwoPhaseFileHeader hdr;
|
|
|
|
|
TransactionId *children;
|
|
|
|
|
RelFileLocator *commitrels;
|
|
|
|
|
@@ -1281,10 +1283,11 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
|
|
|
|
|
* If it looks OK (has a valid magic number and CRC), return the palloc'd
|
|
|
|
|
* contents of the file, issuing an error when finding corrupted data. If
|
|
|
|
|
* missing_ok is true, which indicates that missing files can be safely
|
|
|
|
|
* ignored, then return NULL. This state can be reached when doing recovery.
|
|
|
|
|
* ignored, then return NULL. This state can be reached when doing recovery
|
|
|
|
|
* after discarding two-phase files from frozen epochs.
|
|
|
|
|
*/
|
|
|
|
|
static char *
|
|
|
|
|
ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
|
|
|
|
|
ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
|
|
|
|
|
{
|
|
|
|
|
char path[MAXPGPATH];
|
|
|
|
|
char *buf;
|
|
|
|
|
@@ -1296,7 +1299,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
|
|
|
|
|
file_crc;
|
|
|
|
|
int r;
|
|
|
|
|
|
|
|
|
|
TwoPhaseFilePath(path, xid);
|
|
|
|
|
TwoPhaseFilePath(path, fxid);
|
|
|
|
|
|
|
|
|
|
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
|
|
|
|
if (fd < 0)
|
|
|
|
|
@@ -1461,6 +1464,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
|
|
|
|
|
char *buf;
|
|
|
|
|
TwoPhaseFileHeader *hdr;
|
|
|
|
|
bool result;
|
|
|
|
|
FullTransactionId fxid;
|
|
|
|
|
|
|
|
|
|
Assert(TransactionIdIsValid(xid));
|
|
|
|
|
|
|
|
|
|
@@ -1468,7 +1472,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
|
|
|
|
|
return false; /* nothing to do */
|
|
|
|
|
|
|
|
|
|
/* Read and validate file */
|
|
|
|
|
buf = ReadTwoPhaseFile(xid, true);
|
|
|
|
|
fxid = AdjustToFullTransactionId(xid);
|
|
|
|
|
buf = ReadTwoPhaseFile(fxid, true);
|
|
|
|
|
if (buf == NULL)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
@@ -1488,6 +1493,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact;
|
|
|
|
|
PGPROC *proc;
|
|
|
|
|
FullTransactionId fxid;
|
|
|
|
|
TransactionId xid;
|
|
|
|
|
bool ondisk;
|
|
|
|
|
char *buf;
|
|
|
|
|
@@ -1509,7 +1515,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
*/
|
|
|
|
|
gxact = LockGXact(gid, GetUserId());
|
|
|
|
|
proc = GetPGProcByNumber(gxact->pgprocno);
|
|
|
|
|
xid = gxact->xid;
|
|
|
|
|
fxid = gxact->fxid;
|
|
|
|
|
xid = XidFromFullTransactionId(fxid);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Read and validate 2PC state data. State data will typically be stored
|
|
|
|
|
@@ -1517,7 +1524,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
* to disk if for some reason they have lived for a long time.
|
|
|
|
|
*/
|
|
|
|
|
if (gxact->ondisk)
|
|
|
|
|
buf = ReadTwoPhaseFile(xid, false);
|
|
|
|
|
buf = ReadTwoPhaseFile(fxid, false);
|
|
|
|
|
else
|
|
|
|
|
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
|
|
|
|
|
|
|
|
|
|
@@ -1636,11 +1643,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
|
|
|
|
|
/* And now do the callbacks */
|
|
|
|
|
if (isCommit)
|
|
|
|
|
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
|
|
|
|
|
ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
|
|
|
|
|
else
|
|
|
|
|
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
|
|
|
|
|
ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
|
|
|
|
|
|
|
|
|
|
PredicateLockTwoPhaseFinish(xid, isCommit);
|
|
|
|
|
PredicateLockTwoPhaseFinish(fxid, isCommit);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Read this value while holding the two-phase lock, as the on-disk 2PC
|
|
|
|
|
@@ -1664,7 +1671,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
* And now we can clean up any files we may have left.
|
|
|
|
|
*/
|
|
|
|
|
if (ondisk)
|
|
|
|
|
RemoveTwoPhaseFile(xid, true);
|
|
|
|
|
RemoveTwoPhaseFile(fxid, true);
|
|
|
|
|
|
|
|
|
|
MyLockedGxact = NULL;
|
|
|
|
|
|
|
|
|
|
@@ -1677,7 +1684,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
|
|
|
|
* Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
ProcessRecords(char *bufptr, TransactionId xid,
|
|
|
|
|
ProcessRecords(char *bufptr, FullTransactionId fxid,
|
|
|
|
|
const TwoPhaseCallback callbacks[])
|
|
|
|
|
{
|
|
|
|
|
for (;;)
|
|
|
|
|
@@ -1691,24 +1698,28 @@ ProcessRecords(char *bufptr, TransactionId xid,
|
|
|
|
|
bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
|
|
|
|
|
|
|
|
|
|
if (callbacks[record->rmid] != NULL)
|
|
|
|
|
callbacks[record->rmid] (xid, record->info, bufptr, record->len);
|
|
|
|
|
callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
|
|
|
|
|
|
|
|
|
|
bufptr += MAXALIGN(record->len);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Remove the 2PC file for the specified XID.
|
|
|
|
|
* Remove the 2PC file.
|
|
|
|
|
*
|
|
|
|
|
* If giveWarning is false, do not complain about file-not-present;
|
|
|
|
|
* this is an expected case during WAL replay.
|
|
|
|
|
*
|
|
|
|
|
* This routine is used at early stages at recovery where future and
|
|
|
|
|
* past orphaned files are checked, hence the FullTransactionId to build
|
|
|
|
|
* a complete file name fit for the removal.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
|
|
|
|
|
RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
|
|
|
|
|
{
|
|
|
|
|
char path[MAXPGPATH];
|
|
|
|
|
|
|
|
|
|
TwoPhaseFilePath(path, xid);
|
|
|
|
|
TwoPhaseFilePath(path, fxid);
|
|
|
|
|
if (unlink(path))
|
|
|
|
|
if (errno != ENOENT || giveWarning)
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
@@ -1723,7 +1734,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
|
|
|
|
|
* Note: content and len don't include CRC.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
|
|
|
|
|
RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
|
|
|
|
|
{
|
|
|
|
|
char path[MAXPGPATH];
|
|
|
|
|
pg_crc32c statefile_crc;
|
|
|
|
|
@@ -1734,7 +1745,7 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
|
|
|
|
|
COMP_CRC32C(statefile_crc, content, len);
|
|
|
|
|
FIN_CRC32C(statefile_crc);
|
|
|
|
|
|
|
|
|
|
TwoPhaseFilePath(path, xid);
|
|
|
|
|
TwoPhaseFilePath(path, fxid);
|
|
|
|
|
|
|
|
|
|
fd = OpenTransientFile(path,
|
|
|
|
|
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
|
|
|
|
|
@@ -1846,7 +1857,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
|
|
|
|
|
int len;
|
|
|
|
|
|
|
|
|
|
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
|
|
|
|
|
RecreateTwoPhaseFile(gxact->xid, buf, len);
|
|
|
|
|
RecreateTwoPhaseFile(gxact->fxid, buf, len);
|
|
|
|
|
gxact->ondisk = true;
|
|
|
|
|
gxact->prepare_start_lsn = InvalidXLogRecPtr;
|
|
|
|
|
gxact->prepare_end_lsn = InvalidXLogRecPtr;
|
|
|
|
|
@@ -1897,19 +1908,17 @@ restoreTwoPhaseData(void)
|
|
|
|
|
if (strlen(clde->d_name) == 16 &&
|
|
|
|
|
strspn(clde->d_name, "0123456789ABCDEF") == 16)
|
|
|
|
|
{
|
|
|
|
|
TransactionId xid;
|
|
|
|
|
FullTransactionId fxid;
|
|
|
|
|
char *buf;
|
|
|
|
|
|
|
|
|
|
fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
|
|
|
|
|
xid = XidFromFullTransactionId(fxid);
|
|
|
|
|
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
|
|
|
|
|
true, false, false);
|
|
|
|
|
if (buf == NULL)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
PrepareRedoAdd(buf, InvalidXLogRecPtr,
|
|
|
|
|
PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
|
|
|
|
|
InvalidXLogRecPtr, InvalidRepOriginId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1968,9 +1977,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
|
|
|
|
|
|
|
|
|
|
Assert(gxact->inredo);
|
|
|
|
|
|
|
|
|
|
xid = gxact->xid;
|
|
|
|
|
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(xid,
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(gxact->fxid,
|
|
|
|
|
gxact->prepare_start_lsn,
|
|
|
|
|
gxact->ondisk, false, true);
|
|
|
|
|
|
|
|
|
|
@@ -1981,6 +1988,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
|
|
|
|
|
* OK, we think this file is valid. Incorporate xid into the
|
|
|
|
|
* running-minimum result.
|
|
|
|
|
*/
|
|
|
|
|
xid = XidFromFullTransactionId(gxact->fxid);
|
|
|
|
|
if (TransactionIdPrecedes(xid, result))
|
|
|
|
|
result = xid;
|
|
|
|
|
|
|
|
|
|
@@ -2036,15 +2044,12 @@ StandbyRecoverPreparedTransactions(void)
|
|
|
|
|
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
|
|
|
|
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
|
|
|
|
{
|
|
|
|
|
TransactionId xid;
|
|
|
|
|
char *buf;
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
|
|
|
|
|
|
|
|
|
Assert(gxact->inredo);
|
|
|
|
|
|
|
|
|
|
xid = gxact->xid;
|
|
|
|
|
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(xid,
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(gxact->fxid,
|
|
|
|
|
gxact->prepare_start_lsn,
|
|
|
|
|
gxact->ondisk, true, false);
|
|
|
|
|
if (buf != NULL)
|
|
|
|
|
@@ -2077,16 +2082,14 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
|
|
|
|
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
|
|
|
|
|
{
|
|
|
|
|
TransactionId xid;
|
|
|
|
|
char *buf;
|
|
|
|
|
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
|
|
|
|
|
FullTransactionId fxid = gxact->fxid;
|
|
|
|
|
char *bufptr;
|
|
|
|
|
TwoPhaseFileHeader *hdr;
|
|
|
|
|
TransactionId *subxids;
|
|
|
|
|
const char *gid;
|
|
|
|
|
|
|
|
|
|
xid = gxact->xid;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Reconstruct subtrans state for the transaction --- needed because
|
|
|
|
|
* pg_subtrans is not preserved over a restart. Note that we are
|
|
|
|
|
@@ -2096,17 +2099,20 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
* SubTransSetParent has been set before, if the prepared transaction
|
|
|
|
|
* generated xid assignment records.
|
|
|
|
|
*/
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(xid,
|
|
|
|
|
buf = ProcessTwoPhaseBuffer(gxact->fxid,
|
|
|
|
|
gxact->prepare_start_lsn,
|
|
|
|
|
gxact->ondisk, true, false);
|
|
|
|
|
if (buf == NULL)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("recovering prepared transaction %u from shared memory", xid)));
|
|
|
|
|
(errmsg("recovering prepared transaction %u of epoch %u from shared memory",
|
|
|
|
|
XidFromFullTransactionId(gxact->fxid),
|
|
|
|
|
EpochFromFullTransactionId(gxact->fxid))));
|
|
|
|
|
|
|
|
|
|
hdr = (TwoPhaseFileHeader *) buf;
|
|
|
|
|
Assert(TransactionIdEquals(hdr->xid, xid));
|
|
|
|
|
Assert(TransactionIdEquals(hdr->xid,
|
|
|
|
|
XidFromFullTransactionId(gxact->fxid)));
|
|
|
|
|
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
|
|
|
|
|
gid = (const char *) bufptr;
|
|
|
|
|
bufptr += MAXALIGN(hdr->gidlen);
|
|
|
|
|
@@ -2122,7 +2128,7 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
* Recreate its GXACT and dummy PGPROC. But, check whether it was
|
|
|
|
|
* added in redo and already has a shmem entry for it.
|
|
|
|
|
*/
|
|
|
|
|
MarkAsPreparingGuts(gxact, xid, gid,
|
|
|
|
|
MarkAsPreparingGuts(gxact, gxact->fxid, gid,
|
|
|
|
|
hdr->prepared_at,
|
|
|
|
|
hdr->owner, hdr->database);
|
|
|
|
|
|
|
|
|
|
@@ -2137,7 +2143,7 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
/*
|
|
|
|
|
* Recover other state (notably locks) using resource managers.
|
|
|
|
|
*/
|
|
|
|
|
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
|
|
|
|
|
ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Release locks held by the standby process after we process each
|
|
|
|
|
@@ -2145,7 +2151,7 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
* additional locks at any one time.
|
|
|
|
|
*/
|
|
|
|
|
if (InHotStandby)
|
|
|
|
|
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
|
|
|
|
|
StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We're done with recovering this transaction. Clear MyLockedGxact,
|
|
|
|
|
@@ -2164,7 +2170,7 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
/*
|
|
|
|
|
* ProcessTwoPhaseBuffer
|
|
|
|
|
*
|
|
|
|
|
* Given a transaction id, read it either from disk or read it directly
|
|
|
|
|
* Given a FullTransactionId, read it either from disk or read it directly
|
|
|
|
|
* via shmem xlog record pointer using the provided "prepare_start_lsn".
|
|
|
|
|
*
|
|
|
|
|
* If setParent is true, set up subtransaction parent linkages.
|
|
|
|
|
@@ -2173,13 +2179,12 @@ RecoverPreparedTransactions(void)
|
|
|
|
|
* value scanned.
|
|
|
|
|
*/
|
|
|
|
|
static char *
|
|
|
|
|
ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
ProcessTwoPhaseBuffer(FullTransactionId fxid,
|
|
|
|
|
XLogRecPtr prepare_start_lsn,
|
|
|
|
|
bool fromdisk,
|
|
|
|
|
bool setParent, bool setNextXid)
|
|
|
|
|
{
|
|
|
|
|
FullTransactionId nextXid = TransamVariables->nextXid;
|
|
|
|
|
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
|
|
|
|
|
TransactionId *subxids;
|
|
|
|
|
char *buf;
|
|
|
|
|
TwoPhaseFileHeader *hdr;
|
|
|
|
|
@@ -2191,41 +2196,46 @@ ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
Assert(prepare_start_lsn != InvalidXLogRecPtr);
|
|
|
|
|
|
|
|
|
|
/* Already processed? */
|
|
|
|
|
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
|
|
|
|
|
if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
|
|
|
|
|
TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
|
|
|
|
|
{
|
|
|
|
|
if (fromdisk)
|
|
|
|
|
{
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("removing stale two-phase state file for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
RemoveTwoPhaseFile(xid, true);
|
|
|
|
|
(errmsg("removing stale two-phase state file for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
RemoveTwoPhaseFile(fxid, true);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("removing stale two-phase state from memory for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
PrepareRedoRemove(xid, true);
|
|
|
|
|
(errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
PrepareRedoRemoveFull(fxid, true);
|
|
|
|
|
}
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Reject XID if too new */
|
|
|
|
|
if (TransactionIdFollowsOrEquals(xid, origNextXid))
|
|
|
|
|
if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
|
|
|
|
|
{
|
|
|
|
|
if (fromdisk)
|
|
|
|
|
{
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("removing future two-phase state file for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
RemoveTwoPhaseFile(xid, true);
|
|
|
|
|
(errmsg("removing future two-phase state file for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
RemoveTwoPhaseFile(fxid, true);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("removing future two-phase state from memory for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
PrepareRedoRemove(xid, true);
|
|
|
|
|
(errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
PrepareRedoRemoveFull(fxid, true);
|
|
|
|
|
}
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
@@ -2233,7 +2243,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
if (fromdisk)
|
|
|
|
|
{
|
|
|
|
|
/* Read and validate file */
|
|
|
|
|
buf = ReadTwoPhaseFile(xid, false);
|
|
|
|
|
buf = ReadTwoPhaseFile(fxid, false);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
@@ -2243,18 +2253,20 @@ ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
|
|
|
|
|
/* Deconstruct header */
|
|
|
|
|
hdr = (TwoPhaseFileHeader *) buf;
|
|
|
|
|
if (!TransactionIdEquals(hdr->xid, xid))
|
|
|
|
|
if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
|
|
|
|
|
{
|
|
|
|
|
if (fromdisk)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_DATA_CORRUPTED),
|
|
|
|
|
errmsg("corrupted two-phase state file for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
errmsg("corrupted two-phase state file for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
else
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_DATA_CORRUPTED),
|
|
|
|
|
errmsg("corrupted two-phase state in memory for transaction %u",
|
|
|
|
|
xid)));
|
|
|
|
|
errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -2268,14 +2280,14 @@ ProcessTwoPhaseBuffer(TransactionId xid,
|
|
|
|
|
{
|
|
|
|
|
TransactionId subxid = subxids[i];
|
|
|
|
|
|
|
|
|
|
Assert(TransactionIdFollows(subxid, xid));
|
|
|
|
|
Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
|
|
|
|
|
|
|
|
|
|
/* update nextXid if needed */
|
|
|
|
|
if (setNextXid)
|
|
|
|
|
AdvanceNextFullTransactionIdPastXid(subxid);
|
|
|
|
|
|
|
|
|
|
if (setParent)
|
|
|
|
|
SubTransSetParent(subxid, xid);
|
|
|
|
|
SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return buf;
|
|
|
|
|
@@ -2466,8 +2478,9 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
|
|
|
|
* data, the entry is marked as located on disk.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
XLogRecPtr end_lsn, RepOriginId origin_id)
|
|
|
|
|
PrepareRedoAdd(FullTransactionId fxid, char *buf,
|
|
|
|
|
XLogRecPtr start_lsn, XLogRecPtr end_lsn,
|
|
|
|
|
RepOriginId origin_id)
|
|
|
|
|
{
|
|
|
|
|
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
|
|
|
|
|
char *bufptr;
|
|
|
|
|
@@ -2477,6 +2490,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
|
|
|
|
|
Assert(RecoveryInProgress());
|
|
|
|
|
|
|
|
|
|
if (!FullTransactionIdIsValid(fxid))
|
|
|
|
|
{
|
|
|
|
|
Assert(InRecovery);
|
|
|
|
|
fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
|
|
|
|
|
hdr->xid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
|
|
|
|
|
gid = (const char *) bufptr;
|
|
|
|
|
|
|
|
|
|
@@ -2505,7 +2525,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
{
|
|
|
|
|
char path[MAXPGPATH];
|
|
|
|
|
|
|
|
|
|
TwoPhaseFilePath(path, hdr->xid);
|
|
|
|
|
Assert(InRecovery);
|
|
|
|
|
TwoPhaseFilePath(path, fxid);
|
|
|
|
|
|
|
|
|
|
if (access(path, F_OK) == 0)
|
|
|
|
|
{
|
|
|
|
|
@@ -2536,7 +2557,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
gxact->prepared_at = hdr->prepared_at;
|
|
|
|
|
gxact->prepare_start_lsn = start_lsn;
|
|
|
|
|
gxact->prepare_end_lsn = end_lsn;
|
|
|
|
|
gxact->xid = hdr->xid;
|
|
|
|
|
gxact->fxid = fxid;
|
|
|
|
|
gxact->owner = hdr->owner;
|
|
|
|
|
gxact->locking_backend = INVALID_PROC_NUMBER;
|
|
|
|
|
gxact->valid = false;
|
|
|
|
|
@@ -2555,11 +2576,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
false /* backward */ , false /* WAL */ );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
|
|
|
|
|
elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
|
|
|
|
|
XidFromFullTransactionId(gxact->fxid),
|
|
|
|
|
EpochFromFullTransactionId(gxact->fxid));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* PrepareRedoRemove
|
|
|
|
|
* PrepareRedoRemoveFull
|
|
|
|
|
*
|
|
|
|
|
* Remove the corresponding gxact entry from TwoPhaseState. Also remove
|
|
|
|
|
* the 2PC file if a prepared transaction was saved via an earlier checkpoint.
|
|
|
|
|
@@ -2567,8 +2590,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
|
|
|
|
|
* Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
|
|
|
|
|
* is updated.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
PrepareRedoRemove(TransactionId xid, bool giveWarning)
|
|
|
|
|
static void
|
|
|
|
|
PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
|
|
|
|
|
{
|
|
|
|
|
GlobalTransaction gxact = NULL;
|
|
|
|
|
int i;
|
|
|
|
|
@@ -2581,7 +2604,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
|
|
|
|
|
{
|
|
|
|
|
gxact = TwoPhaseState->prepXacts[i];
|
|
|
|
|
|
|
|
|
|
if (gxact->xid == xid)
|
|
|
|
|
if (FullTransactionIdEquals(gxact->fxid, fxid))
|
|
|
|
|
{
|
|
|
|
|
Assert(gxact->inredo);
|
|
|
|
|
found = true;
|
|
|
|
|
@@ -2598,12 +2621,28 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
|
|
|
|
|
/*
|
|
|
|
|
* And now we can clean up any files we may have left.
|
|
|
|
|
*/
|
|
|
|
|
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
|
|
|
|
|
elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
|
|
|
|
|
XidFromFullTransactionId(fxid),
|
|
|
|
|
EpochFromFullTransactionId(fxid));
|
|
|
|
|
|
|
|
|
|
if (gxact->ondisk)
|
|
|
|
|
RemoveTwoPhaseFile(xid, giveWarning);
|
|
|
|
|
RemoveTwoPhaseFile(fxid, giveWarning);
|
|
|
|
|
|
|
|
|
|
RemoveGXact(gxact);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
PrepareRedoRemove(TransactionId xid, bool giveWarning)
|
|
|
|
|
{
|
|
|
|
|
FullTransactionId fxid =
|
|
|
|
|
FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
|
|
|
|
|
|
|
|
|
|
PrepareRedoRemoveFull(fxid, giveWarning);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* LookupGXact
|
|
|
|
|
* Check if the prepared transaction with the given GID, lsn and timestamp
|
|
|
|
|
@@ -2648,7 +2687,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
|
|
|
|
|
* between publisher and subscriber.
|
|
|
|
|
*/
|
|
|
|
|
if (gxact->ondisk)
|
|
|
|
|
buf = ReadTwoPhaseFile(gxact->xid, false);
|
|
|
|
|
buf = ReadTwoPhaseFile(gxact->fxid, false);
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
Assert(gxact->prepare_start_lsn);
|
|
|
|
|
|