diff --git a/contrib/test_decoding/expected/invalidation_distribution.out b/contrib/test_decoding/expected/invalidation_distribution.out
index eb70eda9042..a78ea1a26c5 100644
--- a/contrib/test_decoding/expected/invalidation_distribution.out
+++ b/contrib/test_decoding/expected/invalidation_distribution.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
 step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
@@ -18,3 +18,24 @@ count
 stop    
 (1 row)
 
+
+starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s3_begin: BEGIN;
+step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_commit: COMMIT;
+step s3_commit: COMMIT;
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    0
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distribution.spec b/contrib/test_decoding/specs/invalidation_distribution.spec
index ca051fc1e85..c5977b8c483 100644
--- a/contrib/test_decoding/specs/invalidation_distribution.spec
+++ b/contrib/test_decoding/specs/invalidation_distribution.spec
@@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
 step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
 step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
 
+session "s3"
+setup { SET synchronous_commit=on; }
+step "s3_begin" { BEGIN; }
+step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); }
+step "s3_commit" { COMMIT; }
+
 # Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
 permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
+
+# Expect to get no change because both s1's and s3's transactions
+# use the snapshot from before adding the table tbl1 to the
+# publication by "s2_alter_pub_add_tbl".
+permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index fa9413fa2a0..e5fa1d62151 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -103,12 +103,24 @@
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
+#include "utils/inval.h"
 #include "utils/memdebug.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenodemap.h"
 
 
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+	((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
 {
@@ -220,7 +232,8 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 									   ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs,
+											  SharedInvalidationMessage *msgs);
 
 /*
  * ---------------------------------------
@@ -406,6 +419,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
+	if (txn->invalidations_distributed)
+	{
+		pfree(txn->invalidations_distributed);
+		txn->invalidations_distributed = NULL;
+	}
+
 	/* Reset the toast hash */
 	ReorderBufferToastReset(rb, txn);
 
@@ -1883,7 +1902,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						 * see new catalog contents, so execute all
 						 * invalidations.
 						 */
-						ReorderBufferExecuteInvalidations(rb, txn);
+						ReorderBufferExecuteInvalidations(txn->ninvalidations,
+														  txn->invalidations);
 					}
 
 					break;
@@ -1921,7 +1941,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		if (rbtxn_distr_inval_overflowed(txn))
+		{
+			Assert(txn->ninvalidations_distributed == 0);
+			InvalidateSystemCaches();
+		}
+		else
+		{
+			ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+			ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+											  txn->invalidations_distributed);
+		}
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -1947,7 +1977,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		if (rbtxn_distr_inval_overflowed(txn))
+		{
+			Assert(txn->ninvalidations_distributed == 0);
+			InvalidateSystemCaches();
+		}
+		else
+		{
+			ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+			ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+											  txn->invalidations_distributed);
+		}
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -2060,9 +2100,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	txn->final_lsn = lsn;
 
 	/*
-	 * Process cache invalidation messages if there are any. Even if we're not
-	 * interested in the transaction's contents, it could have manipulated the
-	 * catalog and we need to update the caches according to that.
+	 * Process only cache invalidation messages in this transaction if there
+	 * are any. Even if we're not interested in the transaction's contents, it
+	 * could have manipulated the catalog and we need to update the caches
+	 * according to that.
 	 */
 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
 		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -2253,6 +2294,36 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 	txn->ntuplecids++;
 }
 
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+									 uint32 *ninvals_out,
+									 SharedInvalidationMessage *msgs_new,
+									 Size nmsgs_new)
+{
+	if (*ninvals_out == 0)
+	{
+		*ninvals_out = nmsgs_new;
+		*invals_out = (SharedInvalidationMessage *)
+			palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+		memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+	}
+	else
+	{
+		/* Enlarge the array of inval messages */
+		*invals_out = (SharedInvalidationMessage *)
+			repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+					 (*ninvals_out + nmsgs_new));
+		memcpy(*invals_out + *ninvals_out, msgs_new,
+			   nmsgs_new * sizeof(SharedInvalidationMessage));
+		*ninvals_out += nmsgs_new;
+	}
+}
+
 /*
  * Setup the invalidation of the toplevel transaction.
  *
@@ -2282,24 +2353,74 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 
 	Assert(nmsgs > 0);
 
-	/* Accumulate invalidations. */
-	if (txn->ninvalidations == 0)
-	{
-		txn->ninvalidations = nmsgs;
-		txn->invalidations = (SharedInvalidationMessage *)
-			palloc(sizeof(SharedInvalidationMessage) * nmsgs);
-		memcpy(txn->invalidations, msgs,
-			   sizeof(SharedInvalidationMessage) * nmsgs);
-	}
-	else
-	{
-		txn->invalidations = (SharedInvalidationMessage *)
-			repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
-					 (txn->ninvalidations + nmsgs));
+	ReorderBufferAccumulateInvalidations(&txn->invalidations,
+										 &txn->ninvalidations,
+										 msgs, nmsgs);
 
-		memcpy(txn->invalidations + txn->ninvalidations, msgs,
-			   nmsgs * sizeof(SharedInvalidationMessage));
-		txn->ninvalidations += nmsgs;
+	MemoryContextSwitchTo(oldcontext);
+}
+
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+										 XLogRecPtr lsn, Size nmsgs,
+										 SharedInvalidationMessage *msgs)
+{
+	ReorderBufferTXN *txn;
+	MemoryContext oldcontext;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	/*
+	 * Collect all the invalidations under the top transaction, if available,
+	 * so that we can execute them all together.
+	 */
+	if (txn->toplevel_xid)
+	{
+		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
+									true);
+	}
+
+	Assert(nmsgs > 0);
+
+	if (!rbtxn_distr_inval_overflowed(txn))
+	{
+		/*
+		 * Check the transaction has enough space for storing distributed
+		 * invalidation messages.
+		 */
+		if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+		{
+			/*
+			 * Mark the invalidation message as overflowed and free up the
+			 * messages accumulated so far.
+			 */
+			txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
+
+			if (txn->invalidations_distributed)
+			{
+				pfree(txn->invalidations_distributed);
+				txn->invalidations_distributed = NULL;
+				txn->ninvalidations_distributed = 0;
+			}
+		}
+		else
+			ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+												 &txn->ninvalidations_distributed,
+												 msgs, nmsgs);
 	}
 
 	MemoryContextSwitchTo(oldcontext);
@@ -2310,12 +2431,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
  * in the changestream but we don't know which those are.
  */
 static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
 {
 	int			i;
 
-	for (i = 0; i < txn->ninvalidations; i++)
-		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+	for (i = 0; i < nmsgs; i++)
+		LocalExecuteInvalidationMessage(&msgs[i]);
 }
 
 /*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 3bda41c5251..69b083a36fe 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -927,6 +927,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 		 * contents built by the current transaction even after its decoding,
 		 * which should have been invalidated due to concurrent catalog
 		 * changing transaction.
+		 *
+		 * Distribute only the invalidation messages generated by the current
+		 * committed transaction. Invalidation messages received from other
+		 * transactions would have already been propagated to the relevant
+		 * in-progress transactions. This transaction would have processed
+		 * those invalidations, ensuring that subsequent transactions observe
+		 * a consistent cache state.
 		 */
 		if (txn->xid != xid)
 		{
@@ -940,8 +947,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 			{
 				Assert(msgs != NULL);
 
-				ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
-											  ninvalidations, msgs);
+				ReorderBufferAddDistributedInvalidations(builder->reorder,
+														 txn->xid, lsn,
+														 ninvalidations, msgs);
 			}
 		}
 	}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 545cee891ed..2321bb0b406 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -160,9 +160,10 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
+#define RBTXN_HAS_CATALOG_CHANGES		0x0001
+#define RBTXN_IS_SUBXACT          		0x0002
+#define RBTXN_IS_SERIALIZED       		0x0004
+#define RBTXN_DISTR_INVAL_OVERFLOWED    0x0008
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -182,6 +183,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
 )
 
+/* Is the array of distributed inval messages overflowed? */
+#define rbtxn_distr_inval_overflowed(txn) \
+( \
+	((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
+)
+
 typedef struct ReorderBufferTXN
 {
 	/* See above */
@@ -311,6 +318,12 @@ typedef struct ReorderBufferTXN
 	 * Size of this transaction (changes currently in memory, in bytes).
 	 */
 	Size		size;
+
+	/*
+	 * Stores cache invalidation messages distributed by other transactions.
+	 */
+	uint32		ninvalidations_distributed;
+	SharedInvalidationMessage *invalidations_distributed;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -451,6 +464,9 @@ void		ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr ls
 										 CommandId cmin, CommandId cmax, CommandId combocid);
 void		ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 										  Size nmsgs, SharedInvalidationMessage *msgs);
+void		ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+													 XLogRecPtr lsn, Size nmsgs,
+													 SharedInvalidationMessage *msgs);
 void		ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
 											   SharedInvalidationMessage *invalidations);
 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);