From abfb29648f9adcde84656afde1d50bc8b8e9b6e0 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 12 Feb 2025 16:55:00 -0800 Subject: [PATCH] Rename RBTXN_PREPARE to RBTXN_IS_PREPARE for better clarification. RBTXN_PREPARE flag and rbtxn_prepared macro could be misinterpreted as either indicating the transaction type (e.g. a prepared transaction or a normal transaction) or its currentstate (e.g. skipped or its prepare message is sent), especially after commit 072ee847ad4 introduced the RBTXN_SENT_PREPARE flag and the rbtxn_sent_prepare macro. The RBTXN_PREPARE flag (and its corresponding macro) have been renamed to RBTXN_IS_PREPARE to explicitly indicate the transaction type. Therefore, this commit also adds the RBTXN_IS_PREPARE flag to the transaction that is a prepared transaction and has been skipped, which previously had only the RBTXN_SKIPPED_PREPARE flag. Reviewed-by: Amit Kapila, Peter Smith Discussion: https://postgr.es/m/CAA4eK1KgNmBsG%3D155E7QQ6TX9RoWnM4z5Z20SvsbwxSe_QXYsg%40mail.gmail.com --- src/backend/replication/logical/proto.c | 2 +- .../replication/logical/reorderbuffer.c | 50 ++++++++++++------- src/backend/replication/logical/snapbuild.c | 2 +- src/include/replication/reorderbuffer.h | 8 +-- 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index dc72b7c8f77..1a352b542dc 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -164,7 +164,7 @@ logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, * which case we expect to have a valid GID. */ Assert(txn->gid != NULL); - Assert(rbtxn_prepared(txn)); + Assert(rbtxn_is_prepared(txn)); Assert(TransactionIdIsValid(txn->xid)); /* send the flags field */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ed5a2946dc1..b42f4002ba8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1793,7 +1793,7 @@ ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn * and the toast reconstruction data. The full cleanup will happen as part * of decoding ABORT record of this transaction. */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); ReorderBufferToastReset(rb, txn); /* All changes should be discarded */ @@ -1968,7 +1968,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) { /* * Note, we send stream prepare even if a concurrent abort is @@ -2150,7 +2150,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -2238,7 +2238,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (!streaming) { - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) rb->begin_prepare(rb, txn); else rb->begin(rb, txn); @@ -2280,7 +2280,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * required for the cases when we decode the changes before the * COMMIT record is processed. */ - if (streaming || rbtxn_prepared(change->txn)) + if (streaming || rbtxn_is_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2625,7 +2625,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * Call either PREPARE (for two-phase transactions) or COMMIT (for * regular ones). */ - if (rbtxn_prepared(txn)) + if (rbtxn_is_prepared(txn)) { Assert(!rbtxn_sent_prepare(txn)); rb->prepare(rb, txn, commit_lsn); @@ -2680,12 +2680,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * For 4, as the entire txn has been decoded, we can fully clean up * the TXN reorder buffer. */ - if (streaming || rbtxn_prepared(txn)) + if (streaming || rbtxn_is_prepared(txn)) { if (streaming) ReorderBufferMaybeMarkTXNStreamed(rb, txn); - ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2729,7 +2729,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * during a two-phase commit. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK && - (stream_started || rbtxn_prepared(txn))) + (stream_started || rbtxn_is_prepared(txn))) { /* curtxn must be set for streaming or prepared transactions */ Assert(curtxn); @@ -2816,7 +2816,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn, * Removing this txn before a commit might result in the computation * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. */ - if (!rbtxn_prepared(txn)) + if (!rbtxn_is_prepared(txn)) ReorderBufferCleanupTXN(rb, txn); return; } @@ -2853,7 +2853,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* - * Record the prepare information for a transaction. + * Record the prepare information for a transaction. Also, mark the transaction + * as a prepared transaction. */ bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, @@ -2879,6 +2880,10 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; + /* Mark this transaction as a prepared transaction */ + Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0); + txn->txn_flags |= RBTXN_IS_PREPARED; + return true; } @@ -2894,6 +2899,8 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) if (txn == NULL) return; + /* txn must have been marked as a prepared transaction */ + Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); txn->txn_flags |= RBTXN_SKIPPED_PREPARE; } @@ -2915,12 +2922,16 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, if (txn == NULL) return; - txn->txn_flags |= RBTXN_PREPARE; - txn->gid = pstrdup(gid); - - /* The prepare info must have been updated in txn by now. */ + /* + * txn must have been marked as a prepared transaction and must have + * neither been skipped nor sent a prepare. Also, the prepare info must + * have been updated in it by now. + */ + Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); Assert(txn->final_lsn != InvalidXLogRecPtr); + txn->gid = pstrdup(gid); + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); @@ -2976,12 +2987,13 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, */ if ((txn->final_lsn < two_phase_at) && is_commit) { - txn->txn_flags |= RBTXN_PREPARE; - /* - * The prepare info must have been updated in txn even if we skip - * prepare. + * txn must have been marked as a prepared transaction and skipped but + * not sent a prepare. Also, the prepare info must have been updated + * in txn even if we skip prepare. */ + Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == + (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE)); Assert(txn->final_lsn != InvalidXLogRecPtr); /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index bbedd3de318..05687fd75e5 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -761,7 +761,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * We don't need to add snapshot to prepared transactions as they * should not see the new catalog contents. */ - if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) + if (rbtxn_is_prepared(txn)) continue; elog(DEBUG2, "adding a new snapshot to %u at %X/%X", diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9d9ac2f0830..517a8e3634f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -170,13 +170,15 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SERIALIZED_CLEAR 0x0008 #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 +#define RBTXN_IS_PREPARED 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 #define RBTXN_SENT_PREPARE 0x0200 #define RBTXN_IS_COMMITTED 0x0400 #define RBTXN_IS_ABORTED 0x0800 +#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE) + /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ ( \ @@ -234,9 +236,9 @@ typedef struct ReorderBufferChange * committed. To check whether a prepare or a stream_prepare has already * been sent for this transaction, we need to use rbtxn_sent_prepare(). */ -#define rbtxn_prepared(txn) \ +#define rbtxn_is_prepared(txn) \ ( \ - ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ + ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \ ) /* Has a prepare or stream_prepare already been sent? */