From 43434ed94d80b4a181d5d9dc6bda585a9267eb1e Mon Sep 17 00:00:00 2001
From: Noah Misch <noah@leadboat.com>
Date: Sat, 21 Mar 2020 09:38:33 -0700
Subject: [PATCH] Back-patch log_newpage_range().

Back-patch a subset of commit 9155580fd5fc2a0cbb23376dfca7cd21f59c2c7b
to v11, v10, 9.6, and 9.5.  Include the latest repairs to this function.
Use a new XLOG_FPI_MULTI value instead of reusing XLOG_FPI.  That way,
if an older server reads WAL from this function, that server will PANIC
instead of applying just one page of the record.  The next commit adds a
call to this function.

Discussion: https://postgr.es/m/20200304.162919.898938381201316571.horikyota.ntt@gmail.com
---
 src/backend/access/rmgrdesc/xlogdesc.c   |  6 +-
 src/backend/access/transam/xlog.c        | 23 ++++---
 src/backend/access/transam/xloginsert.c  | 88 ++++++++++++++++++++++++
 src/backend/replication/logical/decode.c |  1 +
 src/include/access/xloginsert.h          |  3 +
 src/include/catalog/pg_control.h         |  1 +
 6 files changed, 113 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index f72f0760173..563fae5ebfa 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -78,7 +78,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 
 		appendStringInfoString(buf, xlrec->rp_name);
 	}
-	else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
+	else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
+			 info == XLOG_FPI_MULTI)
 	{
 		/* no further information to print */
 	}
@@ -182,6 +183,9 @@ xlog_identify(uint8 info)
 		case XLOG_FPI_FOR_HINT:
 			id = "FPI_FOR_HINT";
 			break;
+		case XLOG_FPI_MULTI:
+			id = "FPI_MULTI";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f4b059aa2e8..ec66b99b325 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9721,7 +9721,7 @@ xlog_redo(XLogReaderState *record)
 
 	/* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */
 	Assert(info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
-		   !XLogRecHasAnyBlockRefs(record));
+		   info == XLOG_FPI_MULTI || !XLogRecHasAnyBlockRefs(record));
 
 	if (info == XLOG_NEXTOID)
 	{
@@ -9924,14 +9924,16 @@ xlog_redo(XLogReaderState *record)
 	{
 		/* nothing to do here */
 	}
-	else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
+	else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
+			 info == XLOG_FPI_MULTI)
 	{
-		Buffer		buffer;
+		uint8		block_id;
 
 		/*
 		 * Full-page image (FPI) records contain nothing else but a backup
-		 * block. The block reference must include a full-page image -
-		 * otherwise there would be no point in this record.
+		 * block (or multiple backup blocks). Every block reference must
+		 * include a full-page image - otherwise there would be no point in
+		 * this record.
 		 *
 		 * No recovery conflicts are generated by these generic records - if a
 		 * resource manager needs to generate conflicts, it has to define a
@@ -9943,9 +9945,14 @@ xlog_redo(XLogReaderState *record)
 		 * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
 		 * code just to distinguish them for statistics purposes.
 		 */
-		if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
-			elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
-		UnlockReleaseBuffer(buffer);
+		for (block_id = 0; block_id <= record->max_block_id; block_id++)
+		{
+			Buffer		buffer;
+
+			if (XLogReadBufferForRedo(record, block_id, &buffer) != BLK_RESTORED)
+				elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
+			UnlockReleaseBuffer(buffer);
+		}
 	}
 	else if (info == XLOG_BACKUP_END)
 	{
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 1f86c907598..579d8de7759 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -1021,6 +1021,94 @@ log_newpage_buffer(Buffer buffer, bool page_std)
 	return log_newpage(&rnode, forkNum, blkno, page, page_std);
 }
 
+/*
+ * WAL-log a range of blocks in a relation.
+ *
+ * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
+ * written to the WAL. If the range is large, this is done in multiple WAL
+ * records.
+ *
+ * If all page follows the standard page layout, with a PageHeader and unused
+ * space between pd_lower and pd_upper, set 'page_std' to true. That allows
+ * the unused space to be left out from the WAL records, making them smaller.
+ *
+ * NOTE: This function acquires exclusive-locks on the pages. Typically, this
+ * is used on a newly-built relation, and the caller is holding a
+ * AccessExclusiveLock on it, so no other backend can be accessing it at the
+ * same time. If that's not the case, you must ensure that this does not
+ * cause a deadlock through some other means.
+ */
+void
+log_newpage_range(Relation rel, ForkNumber forkNum,
+				  BlockNumber startblk, BlockNumber endblk,
+				  bool page_std)
+{
+	int			flags;
+	BlockNumber blkno;
+
+	flags = REGBUF_FORCE_IMAGE;
+	if (page_std)
+		flags |= REGBUF_STANDARD;
+
+	/*
+	 * Iterate over all the pages in the range. They are collected into
+	 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
+	 * for each batch.
+	 */
+	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
+
+	blkno = startblk;
+	while (blkno < endblk)
+	{
+		Buffer		bufpack[XLR_MAX_BLOCK_ID];
+		XLogRecPtr	recptr;
+		int			nbufs;
+		int			i;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Collect a batch of blocks. */
+		nbufs = 0;
+		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
+		{
+			Buffer		buf = ReadBufferExtended(rel, forkNum, blkno,
+												 RBM_NORMAL, NULL);
+
+			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+			/*
+			 * Completely empty pages are not WAL-logged. Writing a WAL record
+			 * would change the LSN, and we don't want that. We want the page
+			 * to stay empty.
+			 */
+			if (!PageIsNew(BufferGetPage(buf)))
+				bufpack[nbufs++] = buf;
+			else
+				UnlockReleaseBuffer(buf);
+			blkno++;
+		}
+
+		/* Write WAL record for this batch. */
+		XLogBeginInsert();
+
+		START_CRIT_SECTION();
+		for (i = 0; i < nbufs; i++)
+		{
+			XLogRegisterBuffer(i, bufpack[i], flags);
+			MarkBufferDirty(bufpack[i]);
+		}
+
+		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
+
+		for (i = 0; i < nbufs; i++)
+		{
+			PageSetLSN(BufferGetPage(bufpack[i]), recptr);
+			UnlockReleaseBuffer(bufpack[i]);
+		}
+		END_CRIT_SECTION();
+	}
+}
+
 /*
  * Allocate working buffers needed for WAL record construction.
  */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8274024b6c9..d7a6f5d4687 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -195,6 +195,7 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
 		case XLOG_FPI:
+		case XLOG_FPI_MULTI:
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 174c88677f5..09bf94630a4 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -16,6 +16,7 @@
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/relfilenode.h"
+#include "utils/relcache.h"
 
 /*
  * The minimum size of the WAL construction working area. If you need to
@@ -54,6 +55,8 @@ extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
 extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
 			BlockNumber blk, char *page, bool page_std);
 extern XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std);
+extern void log_newpage_range(Relation rel, ForkNumber forkNum,
+				  BlockNumber startblk, BlockNumber endblk, bool page_std);
 extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
 
 extern void InitXLogInsert(void);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 1ec03caf5fb..77b7a5e5299 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_FPI_MULTI					0xC0
 
 
 /*