diff --git a/contrib/test_decoding/expected/invalidation_distribution.out b/contrib/test_decoding/expected/invalidation_distribution.out
index ad0a944cbf3..ae53b1e61de 100644
--- a/contrib/test_decoding/expected/invalidation_distribution.out
+++ b/contrib/test_decoding/expected/invalidation_distribution.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
 step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
@@ -18,3 +18,24 @@ count
 stop    
 (1 row)
 
+
+starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s3_begin: BEGIN;
+step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_commit: COMMIT;
+step s3_commit: COMMIT;
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distribution.spec b/contrib/test_decoding/specs/invalidation_distribution.spec
index decbed627e3..67d41969ac1 100644
--- a/contrib/test_decoding/specs/invalidation_distribution.spec
+++ b/contrib/test_decoding/specs/invalidation_distribution.spec
@@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
 step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
 step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
 
+session "s3"
+setup { SET synchronous_commit=on; }
+step "s3_begin" { BEGIN; }
+step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); }
+step "s3_commit" { COMMIT; }
+
 # Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
 permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
+
+# Expect to get one insert change with LOGICAL_REP_MSG_INSERT = 'I' from
+# the second "s1_insert_tbl1" executed after adding the table tbl1 to the
+# publication in "s2_alter_pub_add_tbl".
+permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 67655111875..c4299c76fb1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -109,10 +109,22 @@
 #include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+	((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
 {
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
+	if (txn->invalidations_distributed)
+	{
+		pfree(txn->invalidations_distributed);
+		txn->invalidations_distributed = NULL;
+	}
+
 	/* Reset the toast hash */
 	ReorderBufferToastReset(rb, txn);
 
@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+		if (rbtxn_distr_inval_overflowed(txn))
+		{
+			Assert(txn->ninvalidations_distributed == 0);
+			InvalidateSystemCaches();
+		}
+		else
+		{
+			ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+			ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+											  txn->invalidations_distributed);
+		}
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(txn->ninvalidations,
-										  txn->invalidations);
+		if (rbtxn_distr_inval_overflowed(txn))
+		{
+			Assert(txn->ninvalidations_distributed == 0);
+			InvalidateSystemCaches();
+		}
+		else
+		{
+			ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+			ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+											  txn->invalidations_distributed);
+		}
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		 * We might have decoded changes for this transaction that could load
 		 * the cache as per the current transaction's view (consider DDL's
 		 * happened in this transaction). We don't want the decoding of future
-		 * transactions to use those cache entries so execute invalidations.
+		 * transactions to use those cache entries so execute only the inval
+		 * messages in this transaction.
 		 */
 		if (txn->ninvalidations > 0)
 			ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	txn->final_lsn = lsn;
 
 	/*
-	 * Process cache invalidation messages if there are any. Even if we're not
-	 * interested in the transaction's contents, it could have manipulated the
-	 * catalog and we need to update the caches according to that.
+	 * Process only cache invalidation messages in this transaction if there
+	 * are any. Even if we're not interested in the transaction's contents, it
+	 * could have manipulated the catalog and we need to update the caches
+	 * according to that.
 	 */
 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3421,6 +3460,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 	txn->ntuplecids++;
 }
 
+/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+								XLogRecPtr lsn, Size nmsgs,
+								SharedInvalidationMessage *msgs)
+{
+	ReorderBufferChange *change;
+
+	change = ReorderBufferAllocChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+	change->data.inval.ninvalidations = nmsgs;
+	change->data.inval.invalidations = (SharedInvalidationMessage *)
+		palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+	memcpy(change->data.inval.invalidations, msgs,
+		   sizeof(SharedInvalidationMessage) * nmsgs);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+									 uint32 *ninvals_out,
+									 SharedInvalidationMessage *msgs_new,
+									 Size nmsgs_new)
+{
+	if (*ninvals_out == 0)
+	{
+		*ninvals_out = nmsgs_new;
+		*invals_out = (SharedInvalidationMessage *)
+			palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+		memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+	}
+	else
+	{
+		/* Enlarge the array of inval messages */
+		*invals_out = (SharedInvalidationMessage *)
+			repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+					 (*ninvals_out + nmsgs_new));
+		memcpy(*invals_out + *ninvals_out, msgs_new,
+			   nmsgs_new * sizeof(SharedInvalidationMessage));
+		*ninvals_out += nmsgs_new;
+	}
+}
+
 /*
  * Accumulate the invalidations for executing them later.
  *
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 {
 	ReorderBufferTXN *txn;
 	MemoryContext oldcontext;
-	ReorderBufferChange *change;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 
 	Assert(nmsgs > 0);
 
-	/* Accumulate invalidations. */
-	if (txn->ninvalidations == 0)
-	{
-		txn->ninvalidations = nmsgs;
-		txn->invalidations = (SharedInvalidationMessage *)
-			palloc(sizeof(SharedInvalidationMessage) * nmsgs);
-		memcpy(txn->invalidations, msgs,
-			   sizeof(SharedInvalidationMessage) * nmsgs);
-	}
-	else
-	{
-		txn->invalidations = (SharedInvalidationMessage *)
-			repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
-					 (txn->ninvalidations + nmsgs));
+	ReorderBufferAccumulateInvalidations(&txn->invalidations,
+										 &txn->ninvalidations,
+										 msgs, nmsgs);
 
-		memcpy(txn->invalidations + txn->ninvalidations, msgs,
-			   nmsgs * sizeof(SharedInvalidationMessage));
-		txn->ninvalidations += nmsgs;
+	ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+										 XLogRecPtr lsn, Size nmsgs,
+										 SharedInvalidationMessage *msgs)
+{
+	ReorderBufferTXN *txn;
+	MemoryContext oldcontext;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	/*
+	 * Collect all the invalidations under the top transaction, if available,
+	 * so that we can execute them all together.  See comments
+	 * ReorderBufferAddInvalidations.
+	 */
+	txn = rbtxn_get_toptxn(txn);
+
+	Assert(nmsgs > 0);
+
+	if (!rbtxn_distr_inval_overflowed(txn))
+	{
+		/*
+		 * Check the transaction has enough space for storing distributed
+		 * invalidation messages.
+		 */
+		if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+		{
+			/*
+			 * Mark the invalidation message as overflowed and free up the
+			 * messages accumulated so far.
+			 */
+			txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
+
+			if (txn->invalidations_distributed)
+			{
+				pfree(txn->invalidations_distributed);
+				txn->invalidations_distributed = NULL;
+				txn->ninvalidations_distributed = 0;
+			}
+		}
+		else
+			ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+												 &txn->ninvalidations_distributed,
+												 msgs, nmsgs);
 	}
 
-	change = ReorderBufferAllocChange(rb);
-	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
-	change->data.inval.ninvalidations = nmsgs;
-	change->data.inval.invalidations = (SharedInvalidationMessage *)
-		palloc(sizeof(SharedInvalidationMessage) * nmsgs);
-	memcpy(change->data.inval.invalidations, msgs,
-		   sizeof(SharedInvalidationMessage) * nmsgs);
-
-	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+	/* Queue the invalidation messages into the transaction */
+	ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
 
 	MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0d7bddbe4ed..adf18c397db 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 		 * contents built by the current transaction even after its decoding,
 		 * which should have been invalidated due to concurrent catalog
 		 * changing transaction.
+		 *
+		 * Distribute only the invalidation messages generated by the current
+		 * committed transaction. Invalidation messages received from other
+		 * transactions would have already been propagated to the relevant
+		 * in-progress transactions. This transaction would have processed
+		 * those invalidations, ensuring that subsequent transactions observe
+		 * a consistent cache state.
 		 */
 		if (txn->xid != xid)
 		{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 			{
 				Assert(msgs != NULL);
 
-				ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
-											  ninvalidations, msgs);
+				ReorderBufferAddDistributedInvalidations(builder->reorder,
+														 txn->xid, lsn,
+														 ninvalidations, msgs);
 			}
 		}
 	}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 24e88c409ba..fa0745552f8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -176,6 +176,7 @@ typedef struct ReorderBufferChange
 #define RBTXN_SENT_PREPARE			0x0200
 #define RBTXN_IS_COMMITTED			0x0400
 #define RBTXN_IS_ABORTED			0x0800
+#define RBTXN_DISTR_INVAL_OVERFLOWED	0x1000
 
 #define RBTXN_PREPARE_STATUS_MASK	(RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
 
@@ -265,6 +266,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
 )
 
+/* Is the array of distributed inval messages overflowed? */
+#define rbtxn_distr_inval_overflowed(txn) \
+( \
+	((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
+)
+
 /* Is this a top-level transaction? */
 #define rbtxn_is_toptxn(txn) \
 ( \
@@ -422,6 +429,12 @@ typedef struct ReorderBufferTXN
 	uint32		ninvalidations;
 	SharedInvalidationMessage *invalidations;
 
+	/*
+	 * Stores cache invalidation messages distributed by other transactions.
+	 */
+	uint32		ninvalidations_distributed;
+	SharedInvalidationMessage *invalidations_distributed;
+
 	/* ---
 	 * Position in one of two lists:
 	 * * list of subtransactions if we are *known* to be subxact
@@ -738,6 +751,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 										 CommandId cmin, CommandId cmax, CommandId combocid);
 extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 										  Size nmsgs, SharedInvalidationMessage *msgs);
+extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+													 XLogRecPtr lsn, Size nmsgs,
+													 SharedInvalidationMessage *msgs);
 extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
 											   SharedInvalidationMessage *invalidations);
 extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);