diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509ae..eef70770674 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ - skip_snapshot_restore + skip_snapshot_restore invalidation_distrubution REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out new file mode 100644 index 00000000000..ad0a944cbf3 --- /dev/null +++ b/contrib/test_decoding/expected/invalidation_distrubution.out @@ -0,0 +1,20 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 54d65d3f30f..d5f03fd5e9b 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -63,6 +63,7 @@ tests += { 'twophase_snapshot', 'slot_creation_error', 'skip_snapshot_restore', + 'invalidation_distrubution', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec new file mode 100644 index 00000000000..decbed627e3 --- /dev/null +++ b/contrib/test_decoding/specs/invalidation_distrubution.spec @@ -0,0 +1,32 @@ +# Test that catalog cache invalidation messages are distributed to ongoing +# transactions, ensuring they can access the updated catalog content after +# processing these messages. +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); + CREATE TABLE tbl1(val1 integer, val2 integer); + CREATE PUBLICATION pub; +} + +teardown +{ + DROP TABLE tbl1; + DROP PUBLICATION pub; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; } +step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; } + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 977fbcd2474..67655111875 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -5460,3 +5460,26 @@ restart: *cmax = ent->cmax; return true; } + +/* + * Count invalidation messages of specified transaction. + * + * Returns number of messages, and msgs is set to the pointer of the linked + * list for the messages. + */ +uint32 +ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, + SharedInvalidationMessage **msgs) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + if (txn == NULL) + return 0; + + *msgs = txn->invalidations; + + return txn->ninvalidations; +} diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index b64e53de017..0d7bddbe4ed 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -161,7 +161,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildSnapIncRefcount(Snapshot snap); -static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo); @@ -720,15 +720,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, } /* - * Add a new Snapshot to all transactions we're decoding that currently are - * in-progress so they can see new catalog contents made by the transaction - * that just committed. This is necessary because those in-progress - * transactions will use the new catalog's contents from here on (at the very - * least everything they do needs to be compatible with newer catalog - * contents). + * Add a new Snapshot and invalidation messages to all transactions we're + * decoding that currently are in-progress so they can see new catalog contents + * made by the transaction that just committed. This is necessary because those + * in-progress transactions will use the new catalog's contents from here on + * (at the very least everything they do needs to be compatible with newer + * catalog contents). */ static void -SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) { dlist_iter txn_i; ReorderBufferTXN *txn; @@ -736,7 +736,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) /* * Iterate through all toplevel transactions. This can include * subtransactions which we just don't yet know to be that, but that's - * fine, they will just get an unnecessary snapshot queued. + * fine, they will just get an unnecessary snapshot and invalidations + * queued. */ dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) { @@ -749,6 +750,15 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * transaction which in turn implies we don't yet need a snapshot at * all. We'll add a snapshot when the first change gets queued. * + * Similarly, we don't need to add invalidations to a transaction + * whose base snapshot is not yet set. Once a base snapshot is built, + * it will include the xids of committed transactions that have + * modified the catalog, thus reflecting the new catalog contents. The + * existing catalog cache will have already been invalidated after + * processing the invalidations in the transaction that modified + * catalogs, ensuring that a fresh cache is constructed during + * decoding. + * * NB: This works correctly even for subtransactions because * ReorderBufferAssignChild() takes care to transfer the base snapshot * to the top-level transaction, and while iterating the changequeue @@ -758,13 +768,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) continue; /* - * We don't need to add snapshot to prepared transactions as they - * should not see the new catalog contents. + * We don't need to add snapshot or invalidations to prepared + * transactions as they should not see the new catalog contents. */ if (rbtxn_is_prepared(txn)) continue; - elog(DEBUG2, "adding a new snapshot to %u at %X/%X", + elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X", txn->xid, LSN_FORMAT_ARGS(lsn)); /* @@ -774,6 +784,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, builder->snapshot); + + /* + * Add invalidation messages to the reorder buffer of in-progress + * transactions except the current committed transaction, for which we + * will execute invalidations at the end. + * + * It is required, otherwise, we will end up using the stale catcache + * contents built by the current transaction even after its decoding, + * which should have been invalidated due to concurrent catalog + * changing transaction. + */ + if (txn->xid != xid) + { + uint32 ninvalidations; + SharedInvalidationMessage *msgs = NULL; + + ninvalidations = ReorderBufferGetInvalidations(builder->reorder, + xid, &msgs); + + if (ninvalidations > 0) + { + Assert(msgs != NULL); + + ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, + ninvalidations, msgs); + } + } } } @@ -1045,8 +1082,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new catalog snapshot to all currently running transactions */ - SnapBuildDistributeNewCatalogSnapshot(builder, lsn); + /* + * Add a new catalog snapshot and invalidations messages to all + * currently running transactions. + */ + SnapBuildDistributeSnapshotAndInval(builder, lsn, xid); } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3be0cbd7ebe..24e88c409ba 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -758,6 +758,10 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb); extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr); +extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, + TransactionId xid, + SharedInvalidationMessage **msgs); + extern void StartupReorderBuffer(void); #endif