diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 735b7e7653c..f122dc3a82d 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot \ - skip_snapshot_restore + skip_snapshot_restore invalidation_distrubution REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out new file mode 100644 index 00000000000..eb70eda9042 --- /dev/null +++ b/contrib/test_decoding/expected/invalidation_distrubution.out @@ -0,0 +1,20 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec new file mode 100644 index 00000000000..ca051fc1e85 --- /dev/null +++ b/contrib/test_decoding/specs/invalidation_distrubution.spec @@ -0,0 +1,32 @@ +# Test that catalog cache invalidation messages are distributed to ongoing +# transactions, ensuring they can access the updated catalog content after +# processing these messages. +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); + CREATE TABLE tbl1(val1 integer, val2 integer); + CREATE PUBLICATION pub; +} + +teardown +{ + DROP TABLE tbl1; + DROP PUBLICATION pub; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; } +step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; } + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 56c25e3a6da..fa9413fa2a0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage *msgs) { ReorderBufferTXN *txn; + MemoryContext oldcontext; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - if (txn->ninvalidations != 0) - elog(ERROR, "only ever add one set of invalidations"); + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction, if available, + * so that we can execute them all together. + */ + if (txn->toplevel_xid) + { + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn, + true); + } Assert(nmsgs > 0); - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } + + MemoryContextSwitchTo(oldcontext); } /* @@ -3895,3 +3920,26 @@ restart: *cmax = ent->cmax; return true; } + +/* + * Count invalidation messages of specified transaction. + * + * Returns number of messages, and msgs is set to the pointer of the linked + * list for the messages. + */ +uint32 +ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, + SharedInvalidationMessage **msgs) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + if (txn == NULL) + return 0; + + *msgs = txn->invalidations; + + return txn->ninvalidations; +} diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7546de96763..3bda41c5251 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildSnapIncRefcount(Snapshot snap); -static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); @@ -861,15 +861,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, } /* - * Add a new Snapshot to all transactions we're decoding that currently are - * in-progress so they can see new catalog contents made by the transaction - * that just committed. This is necessary because those in-progress - * transactions will use the new catalog's contents from here on (at the very - * least everything they do needs to be compatible with newer catalog - * contents). + * Add a new Snapshot and invalidation messages to all transactions we're + * decoding that currently are in-progress so they can see new catalog contents + * made by the transaction that just committed. This is necessary because those + * in-progress transactions will use the new catalog's contents from here on + * (at the very least everything they do needs to be compatible with newer + * catalog contents). */ static void -SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) { dlist_iter txn_i; ReorderBufferTXN *txn; @@ -877,7 +877,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) /* * Iterate through all toplevel transactions. This can include * subtransactions which we just don't yet know to be that, but that's - * fine, they will just get an unnecessary snapshot queued. + * fine, they will just get an unnecessary snapshot and invalidations + * queued. */ dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) { @@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * transaction which in turn implies we don't yet need a snapshot at * all. We'll add a snapshot when the first change gets queued. * + * Similarly, we don't need to add invalidations to a transaction whose + * base snapshot is not yet set. Once a base snapshot is built, it will + * include the xids of committed transactions that have modified the + * catalog, thus reflecting the new catalog contents. The existing + * catalog cache will have already been invalidated after processing + * the invalidations in the transaction that modified catalogs, + * ensuring that a fresh cache is constructed during decoding. + * * NB: This works correctly even for subtransactions because * ReorderBufferAssignChild() takes care to transfer the base snapshot * to the top-level transaction, and while iterating the changequeue @@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; - elog(DEBUG2, "adding a new snapshot to %u at %X/%X", + elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X", txn->xid, (uint32) (lsn >> 32), (uint32) lsn); /* @@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, builder->snapshot); + + /* + * Add invalidation messages to the reorder buffer of in-progress + * transactions except the current committed transaction, for which we + * will execute invalidations at the end. + * + * It is required, otherwise, we will end up using the stale catcache + * contents built by the current transaction even after its decoding, + * which should have been invalidated due to concurrent catalog + * changing transaction. + */ + if (txn->xid != xid) + { + uint32 ninvalidations; + SharedInvalidationMessage *msgs = NULL; + + ninvalidations = ReorderBufferGetInvalidations(builder->reorder, + xid, &msgs); + + if (ninvalidations > 0) + { + Assert(msgs != NULL); + + ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, + ninvalidations, msgs); + } + } } } @@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new catalog snapshot to all currently running transactions */ - SnapBuildDistributeNewCatalogSnapshot(builder, lsn); + /* + * Add a new catalog snapshot and invalidations messages to all + * currently running transactions. + */ + SnapBuildDistributeSnapshotAndInval(builder, lsn, xid); } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5347597e92b..545cee891ed 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); +uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, + TransactionId xid, + SharedInvalidationMessage **msgs); + void StartupReorderBuffer(void); #endif