diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a3190dc4f1a..ab2f4a8a92f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid, static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); -static char *ProcessTwoPhaseBuffer(FullTransactionId xid, +static char *ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid); static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); -static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning); +static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); /* @@ -927,26 +927,41 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) /************************************************************************/ /* - * Compute FullTransactionId for the given TransactionId, using the current - * epoch. + * Compute the FullTransactionId for the given TransactionId. + * + * The wrap logic is safe here because the span of active xids cannot exceed one + * epoch at any given time. */ static inline FullTransactionId -FullTransactionIdFromCurrentEpoch(TransactionId xid) +AdjustToFullTransactionId(TransactionId xid) { - FullTransactionId fxid; FullTransactionId nextFullXid; + TransactionId nextXid; uint32 epoch; - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); + Assert(TransactionIdIsValid(xid)); - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - return fxid; + LWLockAcquire(XidGenLock, LW_SHARED); + nextFullXid = TransamVariables->nextXid; + LWLockRelease(XidGenLock); + + nextXid = XidFromFullTransactionId(nextFullXid); + epoch = EpochFromFullTransactionId(nextFullXid); + if (unlikely(xid > nextXid)) + { + /* Wraparound occurred, must be from a prev epoch. */ + Assert(epoch > 0); + epoch--; + } + + return FullTransactionIdFromEpochAndXid(epoch, xid); } static inline int -TwoPhaseFilePath(char *path, FullTransactionId fxid) +TwoPhaseFilePath(char *path, TransactionId xid) { + FullTransactionId fxid = AdjustToFullTransactionId(xid); + return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X", EpochFromFullTransactionId(fxid), XidFromFullTransactionId(fxid)); @@ -1282,8 +1297,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * If it looks OK (has a valid magic number and CRC), return the palloc'd * contents of the file, issuing an error when finding corrupted data. If * missing_ok is true, which indicates that missing files can be safely - * ignored, then return NULL. This state can be reached when doing recovery - * after discarding two-phase files from other epochs. + * ignored, then return NULL. This state can be reached when doing recovery. */ static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok) @@ -1297,10 +1311,8 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) pg_crc32c calc_crc, file_crc; int r; - FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); - TwoPhaseFilePath(path, fxid); + TwoPhaseFilePath(path, xid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); if (fd < 0) @@ -1665,16 +1677,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit) AtEOXact_PgStat(isCommit, false); /* - * And now we can clean up any files we may have left. These should be - * from the current epoch. + * And now we can clean up any files we may have left. */ if (ondisk) - { - FullTransactionId fxid; - - fxid = FullTransactionIdFromCurrentEpoch(xid); - RemoveTwoPhaseFile(fxid, true); - } + RemoveTwoPhaseFile(xid, true); MyLockedGxact = NULL; @@ -1712,17 +1718,13 @@ ProcessRecords(char *bufptr, TransactionId xid, * * If giveWarning is false, do not complain about file-not-present; * this is an expected case during WAL replay. - * - * This routine is used at early stages at recovery where future and - * past orphaned files are checked, hence the FullTransactionId to build - * a complete file name fit for the removal. */ static void -RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning) +RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) { char path[MAXPGPATH]; - TwoPhaseFilePath(path, fxid); + TwoPhaseFilePath(path, xid); if (unlink(path)) if (errno != ENOENT || giveWarning) ereport(WARNING, @@ -1742,16 +1744,13 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) char path[MAXPGPATH]; pg_crc32c statefile_crc; int fd; - FullTransactionId fxid; /* Recompute CRC */ INIT_CRC32C(statefile_crc); COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(xid); - TwoPhaseFilePath(path, fxid); + TwoPhaseFilePath(path, xid); fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY); @@ -1899,9 +1898,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data. * This is called once at the beginning of recovery, saving any extra * lookups in the future. Two-phase files that are newer than the - * minimum XID horizon are discarded on the way. Two-phase files with - * an epoch older or newer than the current checkpoint's record epoch - * are also discarded. + * minimum XID horizon are discarded on the way. */ void restoreTwoPhaseData(void) @@ -1916,11 +1913,14 @@ restoreTwoPhaseData(void) if (strlen(clde->d_name) == 16 && strspn(clde->d_name, "0123456789ABCDEF") == 16) { + TransactionId xid; FullTransactionId fxid; char *buf; fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16)); - buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr, + xid = XidFromFullTransactionId(fxid); + + buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, true, false, false); if (buf == NULL) continue; @@ -1971,7 +1971,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId result = origNextXid; TransactionId *xids = NULL; - uint32 epoch = EpochFromFullTransactionId(nextXid); int nxids = 0; int allocsize = 0; int i; @@ -1980,7 +1979,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; @@ -1988,12 +1986,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) xid = gxact->xid; - /* - * All two-phase files with past and future epoch in pg_twophase are - * gone at this point, so we're OK to rely on only the current epoch. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2055,18 +2048,11 @@ void StandbyRecoverPreparedTransactions(void) { int i; - uint32 epoch; - FullTransactionId nextFullXid; - - /* get current epoch */ - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; @@ -2074,12 +2060,7 @@ StandbyRecoverPreparedTransactions(void) xid = gxact->xid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf != NULL) @@ -2108,18 +2089,11 @@ void RecoverPreparedTransactions(void) { int i; - uint32 epoch; - FullTransactionId nextFullXid; - - /* get current epoch */ - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; char *bufptr; @@ -2127,10 +2101,6 @@ RecoverPreparedTransactions(void) TransactionId *subxids; const char *gid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ xid = gxact->xid; /* @@ -2142,8 +2112,7 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf == NULL) @@ -2211,7 +2180,7 @@ RecoverPreparedTransactions(void) /* * ProcessTwoPhaseBuffer * - * Given a FullTransactionId, read it either from disk or read it directly + * Given a transaction id, read it either from disk or read it directly * via shmem xlog record pointer using the provided "prepare_start_lsn". * * If setParent is true, set up subtransaction parent linkages. @@ -2220,66 +2189,23 @@ RecoverPreparedTransactions(void) * value scanned. */ static char * -ProcessTwoPhaseBuffer(FullTransactionId fxid, +ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid) { FullTransactionId nextXid = TransamVariables->nextXid; + TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId *subxids; char *buf; TwoPhaseFileHeader *hdr; int i; - TransactionId xid = XidFromFullTransactionId(fxid); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); - /* - * Reject full XID if too new. Note that this discards files from future - * epochs. - */ - if (FullTransactionIdFollowsOrEquals(fxid, nextXid)) - { - if (fromdisk) - { - ereport(WARNING, - (errmsg("removing future two-phase state file of epoch %u for transaction %u", - EpochFromFullTransactionId(fxid), xid))); - RemoveTwoPhaseFile(fxid, true); - } - else - { - ereport(WARNING, - (errmsg("removing future two-phase state from memory for transaction %u", - xid))); - PrepareRedoRemove(xid, true); - } - return NULL; - } - - /* Discard files from past epochs */ - if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid)) - { - if (fromdisk) - { - ereport(WARNING, - (errmsg("removing past two-phase state file of epoch %u for transaction %u", - EpochFromFullTransactionId(fxid), xid))); - RemoveTwoPhaseFile(fxid, true); - } - else - { - ereport(WARNING, - (errmsg("removing past two-phase state from memory for transaction %u", - xid))); - PrepareRedoRemove(xid, true); - } - return NULL; - } - /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { @@ -2288,7 +2214,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, ereport(WARNING, (errmsg("removing stale two-phase state file for transaction %u", xid))); - RemoveTwoPhaseFile(fxid, true); + RemoveTwoPhaseFile(xid, true); } else { @@ -2300,6 +2226,26 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, return NULL; } + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing future two-phase state file for transaction %u", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing future two-phase state from memory for transaction %u", + xid))); + PrepareRedoRemove(xid, true); + } + return NULL; + } + if (fromdisk) { /* Read and validate file */ @@ -2574,11 +2520,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, if (!XLogRecPtrIsInvalid(start_lsn)) { char path[MAXPGPATH]; - FullTransactionId fxid; - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(hdr->xid); - TwoPhaseFilePath(path, fxid); + TwoPhaseFilePath(path, hdr->xid); if (access(path, F_OK) == 0) { @@ -2673,15 +2616,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) */ elog(DEBUG2, "removing 2PC data for transaction %u", xid); if (gxact->ondisk) - { - FullTransactionId fxid; - - /* - * We should deal with a file at the current epoch here. - */ - fxid = FullTransactionIdFromCurrentEpoch(xid); - RemoveTwoPhaseFile(fxid, giveWarning); - } + RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl index cf61a2f3285..1a662ebe499 100644 --- a/src/test/recovery/t/009_twophase.pl +++ b/src/test/recovery/t/009_twophase.pl @@ -572,38 +572,4 @@ my $nsubtrans = $cur_primary->safe_psql('postgres', ); isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed"); -############################################################################### -# Check handling of orphaned 2PC files at recovery. -############################################################################### - -$cur_standby->teardown_node; -$cur_primary->teardown_node; - -# Grab location in logs of primary -my $log_offset = -s $cur_primary->logfile; - -# Create fake files with a transaction ID large or low enough to be in the -# future or the past, in different epochs, then check that the primary is able -# to start and remove these files at recovery. - -# First bump the epoch with pg_resetwal. -$cur_primary->command_ok( - [ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ], - 'bump epoch of primary'); - -my $future_2pc_file = - $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF'; -append_to_file $future_2pc_file, ""; -my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF'; -append_to_file $past_2pc_file, ""; - -$cur_primary->start; -$cur_primary->log_check( - "two-phase files removed at recovery", - $log_offset, - log_like => [ - qr/removing past two-phase state file of epoch 238 for transaction 4095/, - qr/removing future two-phase state file of epoch 511 for transaction 4095/ - ]); - done_testing();