1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-11 10:01:57 +03:00

Revert recovery prefetching feature.

This set of commits has some bugs with known fixes, but at this late
stage in the release cycle it seems best to revert and resubmit next
time, along with some new automated test coverage for this whole area.

Commits reverted:

dc88460c: Doc: Review for "Optionally prefetch referenced data in recovery."
1d257577: Optionally prefetch referenced data in recovery.
f003d9f8: Add circular WAL decoding buffer.
323cbe7c: Remove read_page callback from XLogReader.

Remove the new GUC group WAL_RECOVERY recently added by a55a9847, as the
corresponding section of config.sgml is now reverted.

Discussion: https://postgr.es/m/CAOuzzgrn7iKnFRsB4MHp3UisEQAGgZMbk_ViTN4HV4-Ksq8zCg%40mail.gmail.com
This commit is contained in:
Thomas Munro
2021-05-10 16:00:53 +12:00
parent 63db0ac3f9
commit c2dc19342e
35 changed files with 859 additions and 3124 deletions

View File

@ -123,7 +123,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
ReorderBufferAssignChild(ctx->reorder,
txid,
XLogRecGetXid(record),
record->decoded_record->xl_xid,
buf.origptr);
}

View File

@ -148,8 +148,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@ -199,12 +198,11 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
ctx->page_read = page_read;
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
@ -321,8 +319,7 @@ CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@ -425,7 +422,7 @@ CreateInitDecodingContext(const char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
page_read, cleanup_cb, prepare_write, do_write,
xl_routine, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@ -479,8 +476,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
LogicalDecodingXLogPageReadCB page_read,
WALSegmentCleanupCB cleanup_cb,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@ -532,8 +528,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
fast_forward, page_read, cleanup_cb,
prepare_write, do_write, update_progress);
fast_forward, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@ -589,13 +585,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
while (XLogReadRecord(ctx->reader, &record, &err) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
record = XLogReadRecord(ctx->reader, &err);
if (err)
elog(ERROR, "%s", err);
if (!record)

View File

@ -233,8 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
read_local_xlog_page,
wal_segment_close,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
@ -283,13 +284,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
XLogRecord *record;
char *errm = NULL;
while (XLogReadRecord(ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);

View File

@ -153,8 +153,9 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
restart_lsn,
read_local_xlog_page,
wal_segment_close,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
@ -511,8 +512,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
read_local_xlog_page,
wal_segment_close,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
@ -534,13 +536,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
while (XLogReadRecord(ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!ctx->page_read(ctx->reader))
break;
}
record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);

View File

@ -580,7 +580,10 @@ StartReplication(StartReplicationCmd *cmd)
/* create xlogreader for physical replication */
xlogreader =
XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
NULL);
if (!xlogreader)
ereport(ERROR,
@ -803,12 +806,10 @@ StartReplication(StartReplicationCmd *cmd)
* which has to do a plain sleep/busy loop, because the walsender's latch gets
* set every time WAL is flushed.
*/
static bool
logical_read_xlog_page(XLogReaderState *state)
static int
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page)
{
XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen;
char *cur_page = state->readBuf;
XLogRecPtr flushptr;
int count;
WALReadError errinfo;
@ -825,10 +826,7 @@ logical_read_xlog_page(XLogReaderState *state)
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
{
XLogReaderSetInputData(state, -1);
return false;
}
return -1;
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */
@ -836,7 +834,7 @@ logical_read_xlog_page(XLogReaderState *state)
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
if (!WALRead(state, WalSndSegmentOpen, wal_segment_close,
if (!WALRead(state,
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
@ -856,8 +854,7 @@ logical_read_xlog_page(XLogReaderState *state)
XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
CheckXLogRemoved(segno, state->seg.ws_tli);
XLogReaderSetInputData(state, count);
return true;
return count;
}
/*
@ -1010,8 +1007,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
logical_read_xlog_page,
wal_segment_close,
XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@ -1169,8 +1167,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
logical_read_xlog_page,
wal_segment_close,
XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader;
@ -2763,7 +2762,7 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close,
if (!WALRead(xlogreader,
&output_message.data[output_message.len],
startptr,
nbytes,
@ -2861,12 +2860,7 @@ XLogSendLogical(void)
*/
WalSndCaughtUp = false;
while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
XLREAD_NEED_DATA)
{
if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader))
break;
}
record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
/* xlog record was invalid */
if (errm != NULL)