diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c582a5f..6ec09ab192e 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ spill slot truncate ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 00000000000..dc4f9b7018f --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 00000000000..2971ddc69cb --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5a2b828aa3f..87cbd08e858 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -582,7 +582,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (!ctx->fast_forward) ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. It is possible that we didn't mark this transaction and + * its subtransactions as containing catalog changes when the decoding + * starts from a commit record without decoding the transaction's other + * changes. Therefore, we ensure to mark such transactions as containing + * catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * these transactions in the historic snapshot. + */ + SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, + parsed->nsubxacts, parsed->subxacts, + buf->origptr); } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index be46bf0363d..d407fb3440e 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -252,8 +252,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded was written. The array is + * sorted in xidComparator order. We remove xids from this array when + * they become old enough to matter, and then it eventually becomes empty. + * This array is allocated in builder->context so its lifetime is the same + * as the snapshot builder. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that the + * logical decoding decodes only the commit record of the transaction after + * restoring the previously serialized snapshot in which case we will miss + * adding the xid to the snapshot and end up looking at the catalogs with the + * wrong snapshot. + * + * Now to avoid the above problem, if the COMMIT record of the xid listed in + * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top + * transaction and its substransactions as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But that won't be a problem since we + * use snapshot built during decoding only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -890,12 +920,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -930,6 +965,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (NInitialRunningXacts == 0) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also + * be sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1137,7 +1215,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1288,6 +1366,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded was written. We use + * this later to identify the transactions have performed catalog + * changes. See SnapBuildXidSetCatalogChanges. + */ + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + /* there won't be any state to cleanup */ return false; } @@ -2030,3 +2122,30 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * Mark the transaction as containing catalog changes. In addition, if the + * given xid is in the list of the initial running xacts, we mark its + * subtransactions as well. See comments for NInitialRunningXacts and + * InitialRunningXacts for additional info. + */ +void +SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt, + TransactionId *subxacts, XLogRecPtr lsn) +{ + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + /* Skip if there is no initial running xacts information */ + if (NInitialRunningXacts == 0) + return; + + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index b048dc7484c..17d2f933004 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */