diff --git a/contrib/test_decoding/expected/invalidation_distribution.out b/contrib/test_decoding/expected/invalidation_distribution.out index ad0a944cbf3..ae53b1e61de 100644 --- a/contrib/test_decoding/expected/invalidation_distribution.out +++ b/contrib/test_decoding/expected/invalidation_distribution.out @@ -1,4 +1,4 @@ -Parsed test spec with 2 sessions +Parsed test spec with 3 sessions starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); @@ -18,3 +18,24 @@ count stop (1 row) + +starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s3_begin: BEGIN; +step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2); +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s3_commit: COMMIT; +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/invalidation_distribution.spec b/contrib/test_decoding/specs/invalidation_distribution.spec index decbed627e3..67d41969ac1 100644 --- a/contrib/test_decoding/specs/invalidation_distribution.spec +++ b/contrib/test_decoding/specs/invalidation_distribution.spec @@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; } step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; } step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; } +session "s3" +setup { SET synchronous_commit=on; } +step "s3_begin" { BEGIN; } +step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); } +step "s3_commit" { COMMIT; } + # Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" + +# Expect to get one insert change with LOGICAL_REP_MSG_INSERT = 'I' from +# the second "s1_insert_tbl1" executed after adding the table tbl1 to the +# publication in "s2_alter_pub_add_tbl". +permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fa04e829cc9..6464a926c20 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -103,11 +103,22 @@ #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" +#include "utils/inval.h" #include "utils/memdebug.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * Each transaction has an 8MB limit for invalidation messages distributed from + * other transactions. This limit is set considering scenarios with many + * concurrent logical decoding operations. When the distributed invalidation + * messages reach this threshold, the transaction is marked as + * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost + * some inval messages and hence don't know what needs to be invalidated. + */ +#define MAX_DISTR_INVAL_MSG_PER_TXN \ + ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage)) /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -461,6 +472,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + } + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); @@ -2539,7 +2556,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2585,8 +2612,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(txn->ninvalidations, - txn->invalidations); + if (rbtxn_distr_inval_overflowed(txn)) + { + Assert(txn->ninvalidations_distributed == 0); + InvalidateSystemCaches(); + } + else + { + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed, + txn->invalidations_distributed); + } if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2916,7 +2952,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, * We might have decoded changes for this transaction that could load * the cache as per the current transaction's view (consider DDL's * happened in this transaction). We don't want the decoding of future - * transactions to use those cache entries so execute invalidations. + * transactions to use those cache entries so execute only the inval + * messages in this transaction. */ if (txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3003,9 +3040,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) txn->final_lsn = lsn; /* - * Process cache invalidation messages if there are any. Even if we're not - * interested in the transaction's contents, it could have manipulated the - * catalog and we need to update the caches according to that. + * Process only cache invalidation messages in this transaction if there + * are any. Even if we're not interested in the transaction's contents, it + * could have manipulated the catalog and we need to update the caches + * according to that. */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, @@ -3257,6 +3295,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn->ntuplecids++; } +/* + * Add new invalidation messages to the reorder buffer queue. + */ +static void +ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferChange *change; + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); +} + +/* + * A helper function for ReorderBufferAddInvalidations() and + * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation + * messages to the **invals_out. + */ +static void +ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, + uint32 *ninvals_out, + SharedInvalidationMessage *msgs_new, + Size nmsgs_new) +{ + if (*ninvals_out == 0) + { + *ninvals_out = nmsgs_new; + *invals_out = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs_new); + memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new); + } + else + { + /* Enlarge the array of inval messages */ + *invals_out = (SharedInvalidationMessage *) + repalloc(*invals_out, sizeof(SharedInvalidationMessage) * + (*ninvals_out + nmsgs_new)); + memcpy(*invals_out + *ninvals_out, msgs_new, + nmsgs_new * sizeof(SharedInvalidationMessage)); + *ninvals_out += nmsgs_new; + } +} + /* * Accumulate the invalidations for executing them later. * @@ -3277,7 +3366,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { ReorderBufferTXN *txn; MemoryContext oldcontext; - ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -3292,35 +3380,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) - { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else - { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + ReorderBufferAccumulateInvalidations(&txn->invalidations, + &txn->ninvalidations, + msgs, nmsgs); - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Accumulate the invalidations distributed by other committed transactions + * for executing them later. + * + * This function is similar to ReorderBufferAddInvalidations() but stores + * the given inval messages to the txn->invalidations_distributed with the + * overflow check. + * + * This needs to be called by committed transactions to distribute their + * inval messages to in-progress transactions. + */ +void +ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferTXN *txn; + MemoryContext oldcontext; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction, if available, + * so that we can execute them all together. See comments + * ReorderBufferAddInvalidations. + */ + txn = rbtxn_get_toptxn(txn); + + Assert(nmsgs > 0); + + if (!rbtxn_distr_inval_overflowed(txn)) + { + /* + * Check the transaction has enough space for storing distributed + * invalidation messages. + */ + if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN) + { + /* + * Mark the invalidation message as overflowed and free up the + * messages accumulated so far. + */ + txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED; + + if (txn->invalidations_distributed) + { + pfree(txn->invalidations_distributed); + txn->invalidations_distributed = NULL; + txn->ninvalidations_distributed = 0; + } + } + else + ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed, + &txn->ninvalidations_distributed, + msgs, nmsgs); } - change = ReorderBufferGetChange(rb); - change->action = REORDER_BUFFER_CHANGE_INVALIDATION; - change->data.inval.ninvalidations = nmsgs; - change->data.inval.invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(change->data.inval.invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - - ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* Queue the invalidation messages into the transaction */ + ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs); MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index d48ebb8337b..6c49dda75b6 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -918,6 +918,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact * contents built by the current transaction even after its decoding, * which should have been invalidated due to concurrent catalog * changing transaction. + * + * Distribute only the invalidation messages generated by the current + * committed transaction. Invalidation messages received from other + * transactions would have already been propagated to the relevant + * in-progress transactions. This transaction would have processed + * those invalidations, ensuring that subsequent transactions observe + * a consistent cache state. */ if (txn->xid != xid) { @@ -931,8 +938,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddDistributedInvalidations(builder->reorder, + txn->xid, lsn, + ninvalidations, msgs); } } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 8a32bbe28df..1606e0ea007 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -177,15 +177,16 @@ typedef struct ReorderBufferChange } ReorderBufferChange; /* ReorderBufferTXN txn_flags */ -#define RBTXN_HAS_CATALOG_CHANGES 0x0001 -#define RBTXN_IS_SUBXACT 0x0002 -#define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 -#define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 -#define RBTXN_SKIPPED_PREPARE 0x0080 -#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 +#define RBTXN_DISTR_INVAL_OVERFLOWED 0x0200 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -249,6 +250,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ ) +/* Is the array of distributed inval messages overflowed? */ +#define rbtxn_distr_inval_overflowed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \ +) + /* Is this a top-level transaction? */ #define rbtxn_is_toptxn(txn) \ ( \ @@ -435,6 +442,12 @@ typedef struct ReorderBufferTXN * Private data pointer of the output plugin. */ void *output_plugin_private; + + /* + * Stores cache invalidation messages distributed by other transactions. + */ + uint32 ninvalidations_distributed; + SharedInvalidationMessage *invalidations_distributed; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -728,6 +741,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, CommandId cmin, CommandId cmax, CommandId combocid); extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs); extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations); extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);