diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c66ca72bf57..fd55b50262a 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid, static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); -static char *ProcessTwoPhaseBuffer(TransactionId xid, +static char *ProcessTwoPhaseBuffer(FullTransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid); static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); -static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); +static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning); static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); /* @@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) /************************************************************************/ /* - * Compute the FullTransactionId for the given TransactionId. - * - * The wrap logic is safe here because the span of active xids cannot exceed one - * epoch at any given time. + * Compute FullTransactionId for the given TransactionId, using the current + * epoch. */ static inline FullTransactionId -AdjustToFullTransactionId(TransactionId xid) +FullTransactionIdFromCurrentEpoch(TransactionId xid) { + FullTransactionId fxid; FullTransactionId nextFullXid; - TransactionId nextXid; uint32 epoch; - Assert(TransactionIdIsValid(xid)); - - LWLockAcquire(XidGenLock, LW_SHARED); - nextFullXid = TransamVariables->nextXid; - LWLockRelease(XidGenLock); - - nextXid = XidFromFullTransactionId(nextFullXid); + nextFullXid = ReadNextFullTransactionId(); epoch = EpochFromFullTransactionId(nextFullXid); - if (unlikely(xid > nextXid)) - { - /* Wraparound occurred, must be from a prev epoch. */ - Assert(epoch > 0); - epoch--; - } - return FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + return fxid; } static inline int -TwoPhaseFilePath(char *path, TransactionId xid) +TwoPhaseFilePath(char *path, FullTransactionId fxid) { - FullTransactionId fxid = AdjustToFullTransactionId(xid); - return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X", EpochFromFullTransactionId(fxid), XidFromFullTransactionId(fxid)); @@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * If it looks OK (has a valid magic number and CRC), return the palloc'd * contents of the file, issuing an error when finding corrupted data. If * missing_ok is true, which indicates that missing files can be safely - * ignored, then return NULL. This state can be reached when doing recovery. + * ignored, then return NULL. This state can be reached when doing recovery + * after discarding two-phase files from other epochs. */ static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok) @@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) pg_crc32c calc_crc, file_crc; int r; + FullTransactionId fxid; - TwoPhaseFilePath(path, xid); + fxid = FullTransactionIdFromCurrentEpoch(xid); + TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); if (fd < 0) @@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit) AtEOXact_PgStat(isCommit, false); /* - * And now we can clean up any files we may have left. + * And now we can clean up any files we may have left. These should be + * from the current epoch. */ if (ondisk) - RemoveTwoPhaseFile(xid, true); + { + FullTransactionId fxid; + + fxid = FullTransactionIdFromCurrentEpoch(xid); + RemoveTwoPhaseFile(fxid, true); + } MyLockedGxact = NULL; @@ -1719,13 +1713,17 @@ ProcessRecords(char *bufptr, TransactionId xid, * * If giveWarning is false, do not complain about file-not-present; * this is an expected case during WAL replay. + * + * This routine is used at early stages at recovery where future and + * past orphaned files are checked, hence the FullTransactionId to build + * a complete file name fit for the removal. */ static void -RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) +RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning) { char path[MAXPGPATH]; - TwoPhaseFilePath(path, xid); + TwoPhaseFilePath(path, fxid); if (unlink(path)) if (errno != ENOENT || giveWarning) ereport(WARNING, @@ -1745,13 +1743,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) char path[MAXPGPATH]; pg_crc32c statefile_crc; int fd; + FullTransactionId fxid; /* Recompute CRC */ INIT_CRC32C(statefile_crc); COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - TwoPhaseFilePath(path, xid); + /* Use current epoch */ + fxid = FullTransactionIdFromCurrentEpoch(xid); + TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY); @@ -1899,7 +1900,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data. * This is called once at the beginning of recovery, saving any extra * lookups in the future. Two-phase files that are newer than the - * minimum XID horizon are discarded on the way. + * minimum XID horizon are discarded on the way. Two-phase files with + * an epoch older or newer than the current checkpoint's record epoch + * are also discarded. */ void restoreTwoPhaseData(void) @@ -1914,14 +1917,11 @@ restoreTwoPhaseData(void) if (strlen(clde->d_name) == 16 && strspn(clde->d_name, "0123456789ABCDEF") == 16) { - TransactionId xid; FullTransactionId fxid; char *buf; fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16)); - xid = XidFromFullTransactionId(fxid); - - buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, + buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr, true, false, false); if (buf == NULL) continue; @@ -1972,6 +1972,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId result = origNextXid; TransactionId *xids = NULL; + uint32 epoch = EpochFromFullTransactionId(nextXid); int nxids = 0; int allocsize = 0; int i; @@ -1980,6 +1981,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; + FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; @@ -1987,7 +1989,12 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) xid = gxact->xid; - buf = ProcessTwoPhaseBuffer(xid, + /* + * All two-phase files with past and future epoch in pg_twophase are + * gone at this point, so we're OK to rely on only the current epoch. + */ + fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2049,11 +2056,18 @@ void StandbyRecoverPreparedTransactions(void) { int i; + uint32 epoch; + FullTransactionId nextFullXid; + + /* get current epoch */ + nextFullXid = ReadNextFullTransactionId(); + epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; + FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; @@ -2061,7 +2075,12 @@ StandbyRecoverPreparedTransactions(void) xid = gxact->xid; - buf = ProcessTwoPhaseBuffer(xid, + /* + * At this stage, we're OK to work with the current epoch as all past + * and future files have been already discarded. + */ + fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf != NULL) @@ -2090,11 +2109,18 @@ void RecoverPreparedTransactions(void) { int i; + uint32 epoch; + FullTransactionId nextFullXid; + + /* get current epoch */ + nextFullXid = ReadNextFullTransactionId(); + epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; + FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; char *bufptr; @@ -2102,6 +2128,10 @@ RecoverPreparedTransactions(void) TransactionId *subxids; const char *gid; + /* + * At this stage, we're OK to work with the current epoch as all past + * and future files have been already discarded. + */ xid = gxact->xid; /* @@ -2113,7 +2143,8 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - buf = ProcessTwoPhaseBuffer(xid, + fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf == NULL) @@ -2181,7 +2212,7 @@ RecoverPreparedTransactions(void) /* * ProcessTwoPhaseBuffer * - * Given a transaction id, read it either from disk or read it directly + * Given a FullTransactionId, read it either from disk or read it directly * via shmem xlog record pointer using the provided "prepare_start_lsn". * * If setParent is true, set up subtransaction parent linkages. @@ -2190,32 +2221,35 @@ RecoverPreparedTransactions(void) * value scanned. */ static char * -ProcessTwoPhaseBuffer(TransactionId xid, +ProcessTwoPhaseBuffer(FullTransactionId fxid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid) { FullTransactionId nextXid = TransamVariables->nextXid; - TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId *subxids; char *buf; TwoPhaseFileHeader *hdr; int i; + TransactionId xid = XidFromFullTransactionId(fxid); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); - /* Reject XID if too new */ - if (TransactionIdFollowsOrEquals(xid, origNextXid)) + /* + * Reject full XID if too new. Note that this discards files from future + * epochs. + */ + if (FullTransactionIdFollowsOrEquals(fxid, nextXid)) { if (fromdisk) { ereport(WARNING, - (errmsg("removing future two-phase state file for transaction %u", - xid))); - RemoveTwoPhaseFile(xid, true); + (errmsg("removing future two-phase state file of epoch %u for transaction %u", + EpochFromFullTransactionId(fxid), xid))); + RemoveTwoPhaseFile(fxid, true); } else { @@ -2227,6 +2261,26 @@ ProcessTwoPhaseBuffer(TransactionId xid, return NULL; } + /* Discard files from past epochs */ + if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing past two-phase state file of epoch %u for transaction %u", + EpochFromFullTransactionId(fxid), xid))); + RemoveTwoPhaseFile(fxid, true); + } + else + { + ereport(WARNING, + (errmsg("removing past two-phase state from memory for transaction %u", + xid))); + PrepareRedoRemove(xid, true); + } + return NULL; + } + /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { @@ -2235,7 +2289,7 @@ ProcessTwoPhaseBuffer(TransactionId xid, ereport(WARNING, (errmsg("removing stale two-phase state file for transaction %u", xid))); - RemoveTwoPhaseFile(xid, true); + RemoveTwoPhaseFile(fxid, true); } else { @@ -2521,8 +2575,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, if (!XLogRecPtrIsInvalid(start_lsn)) { char path[MAXPGPATH]; + FullTransactionId fxid; - TwoPhaseFilePath(path, hdr->xid); + /* Use current epoch */ + fxid = FullTransactionIdFromCurrentEpoch(hdr->xid); + TwoPhaseFilePath(path, fxid); if (access(path, F_OK) == 0) { @@ -2617,7 +2674,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) */ elog(DEBUG2, "removing 2PC data for transaction %u", xid); if (gxact->ondisk) - RemoveTwoPhaseFile(xid, giveWarning); + { + FullTransactionId fxid; + + /* + * We should deal with a file at the current epoch here. + */ + fxid = FullTransactionIdFromCurrentEpoch(xid); + RemoveTwoPhaseFile(fxid, giveWarning); + } RemoveGXact(gxact); } diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl index 4b3e0f77dc0..f504245c3b2 100644 --- a/src/test/recovery/t/009_twophase.pl +++ b/src/test/recovery/t/009_twophase.pl @@ -572,4 +572,38 @@ my $nsubtrans = $cur_primary->safe_psql('postgres', ); isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed"); +############################################################################### +# Check handling of orphaned 2PC files at recovery. +############################################################################### + +$cur_standby->teardown_node; +$cur_primary->teardown_node; + +# Grab location in logs of primary +my $log_offset = -s $cur_primary->logfile; + +# Create fake files with a transaction ID large or low enough to be in the +# future or the past, in different epochs, then check that the primary is able +# to start and remove these files at recovery. + +# First bump the epoch with pg_resetwal. +$cur_primary->command_ok( + [ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ], + 'bump epoch of primary'); + +my $future_2pc_file = + $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF'; +append_to_file $future_2pc_file, ""; +my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF'; +append_to_file $past_2pc_file, ""; + +$cur_primary->start; +$cur_primary->log_check( + "two-phase files removed at recovery", + $log_offset, + log_like => [ + qr/removing past two-phase state file of epoch 238 for transaction 4095/, + qr/removing future two-phase state file of epoch 511 for transaction 4095/ + ]); + done_testing();