From db03cf375d602e417eda6b7a55eead91618e1398 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sat, 9 Apr 2016 15:02:19 -0400
Subject: [PATCH] Code review/prettification for generic_xlog.c.

Improve commentary, use more specific names for the delta fields,
const-ify pointer arguments where possible, avoid assuming that
initializing only the first element of a local array will guarantee
that the remaining elements end up as we need them.  (I think that
code in generic_redo actually worked, but only because InvalidBuffer
is zero; this is a particularly ugly way of depending on that ...)
---
 src/backend/access/transam/generic_xlog.c | 161 +++++++++++++---------
 1 file changed, 93 insertions(+), 68 deletions(-)

diff --git a/src/backend/access/transam/generic_xlog.c b/src/backend/access/transam/generic_xlog.c
index bd0a1b908df..a32f1711cf3 100644
--- a/src/backend/access/transam/generic_xlog.c
+++ b/src/backend/access/transam/generic_xlog.c
@@ -27,12 +27,12 @@
  * - length of page region (OffsetNumber)
  * - data - the data to place into the region ('length' number of bytes)
  *
- * Unchanged regions of a page are not represented in its delta.  As a
- * result, a delta can be more compact than the full page image.  But having
- * an unchanged region in the middle of two fragments that is smaller than
- * the fragment header (offset and length) does not pay off in terms of the
- * overall size of the delta. For this reason, we break fragments only if
- * the unchanged region is bigger than MATCH_THRESHOLD.
+ * Unchanged regions of a page are not represented in its delta.  As a result,
+ * a delta can be more compact than the full page image.  But having an
+ * unchanged region between two fragments that is smaller than the fragment
+ * header (offset+length) does not pay off in terms of the overall size of
+ * the delta.  For this reason, we merge adjacent fragments if the unchanged
+ * region between them is <= MATCH_THRESHOLD bytes.
  *
  * The worst case for delta sizes occurs when we did not find any unchanged
  * region in the page.  The size of the delta will be the size of the page plus
@@ -41,16 +41,16 @@
  */
 #define FRAGMENT_HEADER_SIZE	(2 * sizeof(OffsetNumber))
 #define MATCH_THRESHOLD			FRAGMENT_HEADER_SIZE
-#define MAX_DELTA_SIZE			BLCKSZ + FRAGMENT_HEADER_SIZE
+#define MAX_DELTA_SIZE			(BLCKSZ + FRAGMENT_HEADER_SIZE)
 
 /* Struct of generic xlog data for single page */
 typedef struct
 {
 	Buffer		buffer;			/* registered buffer */
-	char		image[BLCKSZ];	/* copy of page image for modification */
-	char		data[MAX_DELTA_SIZE];	/* delta between page images */
-	int			dataLen;		/* space consumed in data field */
 	bool		fullImage;		/* are we taking a full image of this page? */
+	int			deltaLen;		/* space consumed in delta field */
+	char		image[BLCKSZ];	/* copy of page image for modification */
+	char		delta[MAX_DELTA_SIZE];	/* delta between page images */
 } PageData;
 
 /* State of generic xlog record construction */
@@ -61,22 +61,26 @@ struct GenericXLogState
 };
 
 static void writeFragment(PageData *pageData, OffsetNumber offset,
-			  OffsetNumber len, Pointer data);
-static void writeDelta(PageData *pageData);
-static void applyPageRedo(Page page, Pointer data, Size dataSize);
+			  OffsetNumber len, const char *data);
+static void computeDelta(PageData *pageData);
+static void applyPageRedo(Page page, const char *delta, Size deltaSize);
+
 
 /*
- * Write next fragment into delta.
+ * Write next fragment into pageData's delta.
+ *
+ * The fragment has the given offset and length, and data points to the
+ * actual data (of length length).
  */
 static void
 writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
-			  Pointer data)
+			  const char *data)
 {
-	Pointer		ptr = pageData->data + pageData->dataLen;
+	char	   *ptr = pageData->delta + pageData->deltaLen;
 
-	/* Check if we have enough space */
-	Assert(pageData->dataLen + sizeof(offset) +
-		   sizeof(length) + length <= sizeof(pageData->data));
+	/* Verify we have enough space */
+	Assert(pageData->deltaLen + sizeof(offset) +
+		   sizeof(length) + length <= sizeof(pageData->delta));
 
 	/* Write fragment data */
 	memcpy(ptr, &offset, sizeof(offset));
@@ -86,14 +90,14 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
 	memcpy(ptr, data, length);
 	ptr += length;
 
-	pageData->dataLen = ptr - pageData->data;
+	pageData->deltaLen = ptr - pageData->delta;
 }
 
 /*
- * Make delta for given page.
+ * Compute the delta record for given page.
  */
 static void
-writeDelta(PageData *pageData)
+computeDelta(PageData *pageData)
 {
 	Page		page = BufferGetPage(pageData->buffer, NULL, NULL,
 									 BGP_NO_SNAPSHOT_TEST),
@@ -106,6 +110,8 @@ writeDelta(PageData *pageData)
 				imageLower = ((PageHeader) image)->pd_lower,
 				imageUpper = ((PageHeader) image)->pd_upper;
 
+	pageData->deltaLen = 0;
+
 	for (i = 0; i < BLCKSZ; i++)
 	{
 		bool		match;
@@ -181,22 +187,22 @@ writeDelta(PageData *pageData)
 		char		tmp[BLCKSZ];
 
 		memcpy(tmp, image, BLCKSZ);
-		applyPageRedo(tmp, pageData->data, pageData->dataLen);
-		if (memcmp(tmp, page, pageLower)
-			|| memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper))
+		applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
+		if (memcmp(tmp, page, pageLower) != 0 ||
+		  memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper) != 0)
 			elog(ERROR, "result of generic xlog apply does not match");
 	}
 #endif
 }
 
 /*
- * Start new generic xlog record.
+ * Start new generic xlog record for modifications to specified relation.
  */
 GenericXLogState *
 GenericXLogStart(Relation relation)
 {
-	int			i;
 	GenericXLogState *state;
+	int			i;
 
 	state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
 
@@ -209,24 +215,30 @@ GenericXLogStart(Relation relation)
 
 /*
  * Register new buffer for generic xlog record.
+ *
+ * Returns pointer to the page's image in the GenericXLogState, which
+ * is what the caller should modify.
+ *
+ * If the buffer is already registered, just return its existing entry.
  */
 Page
 GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
 {
 	int			block_id;
 
-	/* Place new buffer to unused slot in array */
+	/* Search array for existing entry or first unused slot */
 	for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
 	{
 		PageData   *page = &state->pages[block_id];
 
 		if (BufferIsInvalid(page->buffer))
 		{
+			/* Empty slot, so use it (there cannot be a match later) */
 			page->buffer = buffer;
-			memcpy(page->image, BufferGetPage(buffer, NULL, NULL,
-											  BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-			page->dataLen = 0;
 			page->fullImage = isNew;
+			memcpy(page->image,
+				   BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
+				   BLCKSZ);
 			return (Page) page->image;
 		}
 		else if (page->buffer == buffer)
@@ -239,15 +251,16 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
 		}
 	}
 
-	elog(ERROR, "maximum number of %d generic xlog buffers is exceeded",
+	elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
 		 MAX_GENERIC_XLOG_PAGES);
-
 	/* keep compiler quiet */
 	return NULL;
 }
 
 /*
  * Unregister particular buffer for generic xlog record.
+ *
+ * XXX this is dangerous and should go away.
  */
 void
 GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
@@ -274,7 +287,8 @@ GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
 }
 
 /*
- * Put all changes in registered buffers to generic xlog record.
+ * Apply changes represented by GenericXLogState to the actual buffers,
+ * and emit a generic xlog record.
  */
 XLogRecPtr
 GenericXLogFinish(GenericXLogState *state)
@@ -291,33 +305,35 @@ GenericXLogFinish(GenericXLogState *state)
 
 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
 		{
+			PageData   *pageData = &state->pages[i];
+			Page		page;
 			char		tmp[BLCKSZ];
-			PageData   *page = &state->pages[i];
 
-			if (BufferIsInvalid(page->buffer))
+			if (BufferIsInvalid(pageData->buffer))
 				continue;
 
-			/* Swap current and saved page image. */
-			memcpy(tmp, page->image, BLCKSZ);
-			memcpy(page->image, BufferGetPage(page->buffer, NULL, NULL,
-											  BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-			memcpy(BufferGetPage(page->buffer, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST), tmp, BLCKSZ);
+			page = BufferGetPage(pageData->buffer, NULL, NULL,
+								 BGP_NO_SNAPSHOT_TEST);
 
-			if (page->fullImage)
+			/* Swap current and saved page image. */
+			memcpy(tmp, pageData->image, BLCKSZ);
+			memcpy(pageData->image, page, BLCKSZ);
+			memcpy(page, tmp, BLCKSZ);
+
+			if (pageData->fullImage)
 			{
 				/* A full page image does not require anything special */
-				XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE);
+				XLogRegisterBuffer(i, pageData->buffer, REGBUF_FORCE_IMAGE);
 			}
 			else
 			{
 				/*
-				 * In normal mode, calculate delta and write it as data
+				 * In normal mode, calculate delta and write it as xlog data
 				 * associated with this page.
 				 */
-				XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD);
-				writeDelta(page);
-				XLogRegisterBufData(i, page->data, page->dataLen);
+				XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
+				computeDelta(pageData);
+				XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
 			}
 		}
 
@@ -327,13 +343,13 @@ GenericXLogFinish(GenericXLogState *state)
 		/* Set LSN and mark buffers dirty */
 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
 		{
-			PageData   *page = &state->pages[i];
+			PageData   *pageData = &state->pages[i];
 
-			if (BufferIsInvalid(page->buffer))
+			if (BufferIsInvalid(pageData->buffer))
 				continue;
-			PageSetLSN(BufferGetPage(page->buffer, NULL, NULL,
+			PageSetLSN(BufferGetPage(pageData->buffer, NULL, NULL,
 									 BGP_NO_SNAPSHOT_TEST), lsn);
-			MarkBufferDirty(page->buffer);
+			MarkBufferDirty(pageData->buffer);
 		}
 		END_CRIT_SECTION();
 	}
@@ -343,13 +359,15 @@ GenericXLogFinish(GenericXLogState *state)
 		START_CRIT_SECTION();
 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
 		{
-			PageData   *page = &state->pages[i];
+			PageData   *pageData = &state->pages[i];
 
-			if (BufferIsInvalid(page->buffer))
+			if (BufferIsInvalid(pageData->buffer))
 				continue;
-			memcpy(BufferGetPage(page->buffer, NULL, NULL,
-								 BGP_NO_SNAPSHOT_TEST), page->image, BLCKSZ);
-			MarkBufferDirty(page->buffer);
+			memcpy(BufferGetPage(pageData->buffer, NULL, NULL,
+								 BGP_NO_SNAPSHOT_TEST),
+				   pageData->image,
+				   BLCKSZ);
+			MarkBufferDirty(pageData->buffer);
 		}
 		END_CRIT_SECTION();
 	}
@@ -360,7 +378,9 @@ GenericXLogFinish(GenericXLogState *state)
 }
 
 /*
- * Abort generic xlog record.
+ * Abort generic xlog record construction.  No changes are applied to buffers.
+ *
+ * Note: caller is responsible for releasing locks/pins on buffers, if needed.
  */
 void
 GenericXLogAbort(GenericXLogState *state)
@@ -372,10 +392,10 @@ GenericXLogAbort(GenericXLogState *state)
  * Apply delta to given page image.
  */
 static void
-applyPageRedo(Page page, Pointer data, Size dataSize)
+applyPageRedo(Page page, const char *delta, Size deltaSize)
 {
-	Pointer		ptr = data,
-				end = data + dataSize;
+	const char *ptr = delta;
+	const char *end = delta + deltaSize;
 
 	while (ptr < end)
 	{
@@ -399,10 +419,11 @@ applyPageRedo(Page page, Pointer data, Size dataSize)
 void
 generic_redo(XLogReaderState *record)
 {
-	uint8		block_id;
-	Buffer		buffers[MAX_GENERIC_XLOG_PAGES] = {InvalidBuffer};
 	XLogRecPtr	lsn = record->EndRecPtr;
+	Buffer		buffers[MAX_GENERIC_XLOG_PAGES];
+	uint8		block_id;
 
+	/* Protect limited size of buffers[] array */
 	Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
 
 	/* Iterate over blocks */
@@ -411,20 +432,24 @@ generic_redo(XLogReaderState *record)
 		XLogRedoAction action;
 
 		if (!XLogRecHasBlockRef(record, block_id))
+		{
+			buffers[block_id] = InvalidBuffer;
 			continue;
+		}
 
 		action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
 
 		/* Apply redo to given block if needed */
 		if (action == BLK_NEEDS_REDO)
 		{
-			Pointer		blockData;
-			Size		blockDataSize;
 			Page		page;
+			char	   *blockDelta;
+			Size		blockDeltaSize;
 
-			page = BufferGetPage(buffers[block_id], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
-			blockData = XLogRecGetBlockData(record, block_id, &blockDataSize);
-			applyPageRedo(page, blockData, blockDataSize);
+			page = BufferGetPage(buffers[block_id], NULL, NULL,
+								 BGP_NO_SNAPSHOT_TEST);
+			blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
+			applyPageRedo(page, blockDelta, blockDeltaSize);
 
 			PageSetLSN(page, lsn);
 			MarkBufferDirty(buffers[block_id]);