diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out index dd6053f9c1f..57268b38d33 100644 --- a/contrib/test_decoding/expected/oldest_xmin.out +++ b/contrib/test_decoding/expected/oldest_xmin.out @@ -38,3 +38,44 @@ COMMIT stop (1 row) + +starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes +step s0_begin: BEGIN; +step s0_getxid: SELECT pg_current_xact_id() IS NULL; +?column? +-------- +f +(1 row) + +step s1_begin: BEGIN; +step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3)); +step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos; +step s0_commit: COMMIT; +step s0_checkpoint: CHECKPOINT; +step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); +slot_name +-------------- +isolation_slot +(1 row) + +step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); +slot_name +-------------- +isolation_slot +(1 row) + +step s1_commit: COMMIT; +step s0_vacuum: VACUUM pg_attribute; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data +------------------------------------------------------ +BEGIN +table public.harvest: INSERT: fruits[basket]:'(1,2,3)' +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec index 88bd30f5ff7..7f2fe3d7ed7 100644 --- a/contrib/test_decoding/specs/oldest_xmin.spec +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; } step "s0_checkpoint" { CHECKPOINT; } step "s0_vacuum" { VACUUM pg_attribute; } step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step "s0_advance_slot" { SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); } session "s1" setup { SET synchronous_commit=on; } @@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; } # will be removed (xmax set) before T1 commits. That is, interlocking doesn't # forbid modifying catalog after someone read it (and didn't commit yet). permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes" + +# Perform the same testing process as described above, but use advance_slot to +# forces xmin advancement during fast forward decoding. +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_advance_slot" "s0_advance_slot" "s1_commit" "s0_vacuum" "s0_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 78f9a0a11c4..cc03f0706e9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -412,19 +412,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: + if (!ctx->fast_forward) { xl_heap_new_cid *xlrec; @@ -471,16 +476,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeInsert(ctx, buf); break; @@ -491,17 +500,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeTruncate(ctx, buf); break; @@ -525,7 +537,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_CONFIRM: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeSpecConfirm(ctx, buf); break;