mirror of
https://github.com/postgres/postgres.git
synced 2025-05-02 11:44:50 +03:00
Teach xlogreader to follow timeline switches
Uses page-based mechanism to ensure we’re using the correct timeline. Tests are included to exercise the functionality using a cold disk-level copy of the master that's started up as a replica with slots intact, but the intended use of the functionality is with later features. Craig Ringer, reviewed by Simon Riggs and Andres Freund
This commit is contained in:
parent
9ca2dd578d
commit
1148e22a82
@ -19,6 +19,7 @@
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogutils.h"
|
||||
@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
|
||||
/* state maintained across calls */
|
||||
static int sendFile = -1;
|
||||
static XLogSegNo sendSegNo = 0;
|
||||
static TimeLineID sendTLI = 0;
|
||||
static uint32 sendOff = 0;
|
||||
|
||||
p = buf;
|
||||
@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
|
||||
startoff = recptr % XLogSegSize;
|
||||
|
||||
/* Do we need to switch to a different xlog segment? */
|
||||
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
|
||||
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
|
||||
sendTLI != tli)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
|
||||
path)));
|
||||
}
|
||||
sendOff = 0;
|
||||
sendTLI = tli;
|
||||
}
|
||||
|
||||
/* Need to seek in the file? */
|
||||
@ -753,6 +757,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Determine which timeline to read an xlog page from and set the
|
||||
* XLogReaderState's currTLI to that timeline ID.
|
||||
*
|
||||
* We care about timelines in xlogreader when we might be reading xlog
|
||||
* generated prior to a promotion, either if we're currently a standby in
|
||||
* recovery or if we're a promoted master reading xlogs generated by the old
|
||||
* master before our promotion.
|
||||
*
|
||||
* wantPage must be set to the start address of the page to read and
|
||||
* wantLength to the amount of the page that will be read, up to
|
||||
* XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
|
||||
*
|
||||
* We switch to an xlog segment from the new timeline eagerly when on a
|
||||
* historical timeline, as soon as we reach the start of the xlog segment
|
||||
* containing the timeline switch. The server copied the segment to the new
|
||||
* timeline so all the data up to the switch point is the same, but there's no
|
||||
* guarantee the old segment will still exist. It may have been deleted or
|
||||
* renamed with a .partial suffix so we can't necessarily keep reading from
|
||||
* the old TLI even though tliSwitchPoint says it's OK.
|
||||
*
|
||||
* We can't just check the timeline when we read a page on a different segment
|
||||
* to the last page. We could've received a timeline switch from a cascading
|
||||
* upstream, so the current segment ends apruptly (possibly getting renamed to
|
||||
* .partial) and we have to switch to a new one. Even in the middle of reading
|
||||
* a page we could have to dump the cached page and switch to a new TLI.
|
||||
*
|
||||
* Because of this, callers MAY NOT assume that currTLI is the timeline that
|
||||
* will be in a page's xlp_tli; the page may begin on an older timeline or we
|
||||
* might be reading from historical timeline data on a segment that's been
|
||||
* copied to a new timeline.
|
||||
*
|
||||
* The caller must also make sure it doesn't read past the current replay
|
||||
* position (using GetWalRcvWriteRecPtr) if executing in recovery, so it
|
||||
* doesn't fail to notice that the current timeline became historical. The
|
||||
* caller must also update ThisTimeLineID with the result of
|
||||
* GetWalRcvWriteRecPtr and must check RecoveryInProgress().
|
||||
*/
|
||||
void
|
||||
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
|
||||
{
|
||||
const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
|
||||
|
||||
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
|
||||
Assert(wantLength <= XLOG_BLCKSZ);
|
||||
Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
|
||||
|
||||
/*
|
||||
* If the desired page is currently read in and valid, we have nothing to do.
|
||||
*
|
||||
* The caller should've ensured that it didn't previously advance readOff
|
||||
* past the valid limit of this timeline, so it doesn't matter if the current
|
||||
* TLI has since become historical.
|
||||
*/
|
||||
if (lastReadPage == wantPage &&
|
||||
state->readLen != 0 &&
|
||||
lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
|
||||
return;
|
||||
|
||||
/*
|
||||
* If we're reading from the current timeline, it hasn't become historical
|
||||
* and the page we're reading is after the last page read, we can again
|
||||
* just carry on. (Seeking backwards requires a check to make sure the older
|
||||
* page isn't on a prior timeline).
|
||||
*
|
||||
* ThisTimeLineID might've become historical since we last looked, but the
|
||||
* caller is required not to read past the flush limit it saw at the time
|
||||
* it looked up the timeline. There's nothing we can do about it if
|
||||
* StartupXLOG() renames it to .partial concurrently.
|
||||
*/
|
||||
if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
|
||||
{
|
||||
Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we're just reading pages from a previously validated historical
|
||||
* timeline and the timeline we're reading from is valid until the
|
||||
* end of the current segment we can just keep reading.
|
||||
*/
|
||||
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
|
||||
state->currTLI != ThisTimeLineID &&
|
||||
state->currTLI != 0 &&
|
||||
(wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
|
||||
return;
|
||||
|
||||
/*
|
||||
* If we reach this point we're either looking up a page for random access,
|
||||
* the current timeline just became historical, or we're reading from a new
|
||||
* segment containing a timeline switch. In all cases we need to determine
|
||||
* the newest timeline on the segment.
|
||||
*
|
||||
* If it's the current timeline we can just keep reading from here unless
|
||||
* we detect a timeline switch that makes the current timeline historical.
|
||||
* If it's a historical timeline we can read all the segment on the newest
|
||||
* timeline because it contains all the old timelines' data too. So only
|
||||
* one switch check is required.
|
||||
*/
|
||||
{
|
||||
/*
|
||||
* We need to re-read the timeline history in case it's been changed
|
||||
* by a promotion or replay from a cascaded replica.
|
||||
*/
|
||||
List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
|
||||
|
||||
XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
|
||||
|
||||
Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
|
||||
|
||||
/* Find the timeline of the last LSN on the segment containing wantPage. */
|
||||
state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
|
||||
state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
|
||||
&state->nextTLI);
|
||||
|
||||
Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
|
||||
wantPage + wantLength < state->currTLIValidUntil);
|
||||
|
||||
list_free_deep(timelineHistory);
|
||||
|
||||
elog(DEBUG3, "switched to timeline %u valid until %X/%X",
|
||||
state->currTLI,
|
||||
(uint32)(state->currTLIValidUntil >> 32),
|
||||
(uint32)(state->currTLIValidUntil));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* read_page callback for reading local xlog files
|
||||
*
|
||||
@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
|
||||
int count;
|
||||
|
||||
loc = targetPagePtr + reqLen;
|
||||
|
||||
/* Loop waiting for xlog to be available if necessary */
|
||||
while (1)
|
||||
{
|
||||
/*
|
||||
* TODO: we're going to have to do something more intelligent about
|
||||
* timelines on standbys. Use readTimeLineHistory() and
|
||||
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
|
||||
* that case earlier, but the code and TODO is left in here for when
|
||||
* that changes.
|
||||
* Determine the limit of xlog we can currently read to, and what the
|
||||
* most recent timeline is.
|
||||
*
|
||||
* RecoveryInProgress() will update ThisTimeLineID when it first
|
||||
* notices recovery finishes, so we only have to maintain it for the
|
||||
* local process until recovery ends.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
*pageTLI = ThisTimeLineID;
|
||||
read_upto = GetFlushRecPtr();
|
||||
else
|
||||
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
|
||||
|
||||
*pageTLI = ThisTimeLineID;
|
||||
|
||||
/*
|
||||
* Check which timeline to get the record from.
|
||||
*
|
||||
* We have to do it each time through the loop because if we're in
|
||||
* recovery as a cascading standby, the current timeline might've
|
||||
* become historical. We can't rely on RecoveryInProgress() because
|
||||
* in a standby configuration like
|
||||
*
|
||||
* A => B => C
|
||||
*
|
||||
* if we're a logical decoding session on C, and B gets promoted, our
|
||||
* timeline will change while we remain in recovery.
|
||||
*
|
||||
* We can't just keep reading from the old timeline as the last WAL
|
||||
* archive in the timeline will get renamed to .partial by StartupXLOG().
|
||||
*
|
||||
* If that happens after our caller updated ThisTimeLineID but before
|
||||
* we actually read the xlog page, we might still try to read from the
|
||||
* old (now renamed) segment and fail. There's not much we can do about
|
||||
* this, but it can only happen when we're a leaf of a cascading
|
||||
* standby whose master gets promoted while we're decoding, so a
|
||||
* one-off ERROR isn't too bad.
|
||||
*/
|
||||
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
|
||||
|
||||
if (state->currTLI == ThisTimeLineID)
|
||||
{
|
||||
|
||||
if (loc <= read_upto)
|
||||
break;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
else
|
||||
read_upto = GetXLogReplayRecPtr(pageTLI);
|
||||
{
|
||||
/*
|
||||
* We're on a historical timeline, so limit reading to the switch
|
||||
* point where we moved to the next timeline.
|
||||
*
|
||||
* We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
|
||||
* about the new timeline, so we must've received past the end of
|
||||
* it.
|
||||
*/
|
||||
read_upto = state->currTLIValidUntil;
|
||||
|
||||
if (loc <= read_upto)
|
||||
/*
|
||||
* Setting pageTLI to our wanted record's TLI is slightly wrong;
|
||||
* the page might begin on an older timeline if it contains a
|
||||
* timeline switch, since its xlog segment will have been copied
|
||||
* from the prior timeline. This is pretty harmless though, as
|
||||
* nothing cares so long as the timeline doesn't go backwards. We
|
||||
* should read the page header instead; FIXME someday.
|
||||
*/
|
||||
*pageTLI = state->currTLI;
|
||||
|
||||
/* No need to wait on a historical timeline */
|
||||
break;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
}
|
||||
|
||||
if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
|
||||
|
@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
|
||||
rsinfo->setResult = p->tupstore;
|
||||
rsinfo->setDesc = p->tupdesc;
|
||||
|
||||
/* compute the current end-of-wal */
|
||||
/*
|
||||
* Compute the current end-of-wal and maintain ThisTimeLineID.
|
||||
* RecoveryInProgress() will update ThisTimeLineID on promotion.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
end_of_wal = GetFlushRecPtr();
|
||||
else
|
||||
end_of_wal = GetXLogReplayRecPtr(NULL);
|
||||
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
|
||||
|
||||
ReplicationSlotAcquire(NameStr(*name));
|
||||
|
||||
@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
|
||||
/* invalidate non-timetravel entries */
|
||||
InvalidateSystemCaches();
|
||||
|
||||
/* Decode until we run out of records */
|
||||
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
|
||||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
|
||||
{
|
||||
|
@ -48,6 +48,7 @@
|
||||
#include "access/transam.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogutils.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/dbcommands.h"
|
||||
@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
|
||||
XLogRecPtr flushptr;
|
||||
int count;
|
||||
|
||||
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
|
||||
sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
|
||||
sendTimeLine = state->currTLI;
|
||||
sendTimeLineValidUpto = state->currTLIValidUntil;
|
||||
sendTimeLineNextTLI = state->nextTLI;
|
||||
|
||||
/* make sure we have enough WAL available */
|
||||
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
|
||||
|
||||
@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
|
||||
pq_endmessage(&buf);
|
||||
pq_flush();
|
||||
|
||||
/* setup state for XLogReadPage */
|
||||
sendTimeLineIsHistoric = false;
|
||||
sendTimeLine = ThisTimeLineID;
|
||||
|
||||
/*
|
||||
* Initialize position to the last ack'ed one, then the xlog records begin
|
||||
* to be shipped from that position.
|
||||
|
@ -161,6 +161,22 @@ struct XLogReaderState
|
||||
|
||||
/* beginning of the WAL record being read. */
|
||||
XLogRecPtr currRecPtr;
|
||||
/* timeline to read it from, 0 if a lookup is required */
|
||||
TimeLineID currTLI;
|
||||
/*
|
||||
* Safe point to read to in currTLI if current TLI is historical
|
||||
* (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
|
||||
*
|
||||
* Actually set to the start of the segment containing the timeline
|
||||
* switch that ends currTLI's validity, not the LSN of the switch
|
||||
* its self, since we can't assume the old segment will be present.
|
||||
*/
|
||||
XLogRecPtr currTLIValidUntil;
|
||||
/*
|
||||
* If currTLI is not the most recent known timeline, the next timeline to
|
||||
* read from when currTLIValidUntil is reached.
|
||||
*/
|
||||
TimeLineID nextTLI;
|
||||
|
||||
/* Buffer for current ReadRecord result (expandable) */
|
||||
char *readRecordBuf;
|
||||
|
@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
|
||||
XLogRecPtr targetRecPtr, char *cur_page,
|
||||
TimeLineID *pageTLI);
|
||||
|
||||
extern void XLogReadDetermineTimeline(XLogReaderState *state,
|
||||
XLogRecPtr wantPage, uint32 wantLength);
|
||||
|
||||
#endif
|
||||
|
@ -9,6 +9,8 @@
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
EXTRA_INSTALL=contrib/test_decoding
|
||||
|
||||
subdir = src/test/recovery
|
||||
top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
130
src/test/recovery/t/010_logical_decoding_timelines.pl
Normal file
130
src/test/recovery/t/010_logical_decoding_timelines.pl
Normal file
@ -0,0 +1,130 @@
|
||||
# Demonstrate that logical can follow timeline switches.
|
||||
#
|
||||
# Logical replication slots can follow timeline switches but it's
|
||||
# normally not possible to have a logical slot on a replica where
|
||||
# promotion and a timeline switch can occur. The only ways
|
||||
# we can create that circumstance are:
|
||||
#
|
||||
# * By doing a filesystem-level copy of the DB, since pg_basebackup
|
||||
# excludes pg_replslot but we can copy it directly; or
|
||||
#
|
||||
# * by creating a slot directly at the C level on the replica and
|
||||
# advancing it as we go using the low level APIs. It can't be done
|
||||
# from SQL since logical decoding isn't allowed on replicas.
|
||||
#
|
||||
# This module uses the first approach to show that timeline following
|
||||
# on a logical slot works.
|
||||
#
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 7;
|
||||
use RecursiveCopy;
|
||||
use File::Copy;
|
||||
use IPC::Run ();
|
||||
use Scalar::Util qw(blessed);
|
||||
|
||||
my ($stdout, $stderr, $ret);
|
||||
|
||||
# Initialize master node
|
||||
my $node_master = get_new_node('master');
|
||||
$node_master->init(allows_streaming => 1, has_archiving => 1);
|
||||
$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
|
||||
$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
|
||||
$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
|
||||
$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
|
||||
$node_master->dump_info;
|
||||
$node_master->start;
|
||||
|
||||
diag "Testing logical timeline following with a filesystem-level copy";
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
|
||||
);
|
||||
$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('beforebb');");
|
||||
$node_master->safe_psql('postgres', 'CHECKPOINT;');
|
||||
|
||||
my $backup_name = 'b1';
|
||||
$node_master->backup_fs_hot($backup_name);
|
||||
|
||||
my $node_replica = get_new_node('replica');
|
||||
$node_replica->init_from_backup(
|
||||
$node_master, $backup_name,
|
||||
has_streaming => 1,
|
||||
has_restoring => 1);
|
||||
$node_replica->start;
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
|
||||
);
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('afterbb');");
|
||||
$node_master->safe_psql('postgres', 'CHECKPOINT;');
|
||||
|
||||
# Verify that only the before base_backup slot is on the replica
|
||||
$stdout = $node_replica->safe_psql('postgres',
|
||||
'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
|
||||
is($stdout, 'before_basebackup',
|
||||
'Expected to find only slot before_basebackup on replica');
|
||||
|
||||
# Boom, crash
|
||||
$node_master->stop('immediate');
|
||||
|
||||
$node_replica->promote;
|
||||
$node_replica->poll_query_until('postgres',
|
||||
"SELECT NOT pg_is_in_recovery();");
|
||||
|
||||
$node_replica->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('after failover');");
|
||||
|
||||
# Shouldn't be able to read from slot created after base backup
|
||||
($ret, $stdout, $stderr) = $node_replica->psql('postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
|
||||
);
|
||||
is($ret, 3, 'replaying from after_basebackup slot fails');
|
||||
like(
|
||||
$stderr,
|
||||
qr/replication slot "after_basebackup" does not exist/,
|
||||
'after_basebackup slot missing');
|
||||
|
||||
# Should be able to read from slot created before base backup
|
||||
($ret, $stdout, $stderr) = $node_replica->psql(
|
||||
'postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
|
||||
timeout => 30);
|
||||
is($ret, 0, 'replay from slot before_basebackup succeeds');
|
||||
|
||||
my $final_expected_output_bb = q(BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'beforebb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'afterbb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'after failover'
|
||||
COMMIT);
|
||||
is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
|
||||
is($stderr, '', 'replay from slot before_basebackup produces no stderr');
|
||||
|
||||
# So far we've peeked the slots, so when we fetch the same info over
|
||||
# pg_recvlogical we should get complete results. First, find out the commit lsn
|
||||
# of the last transaction. There's no max(pg_lsn), so:
|
||||
|
||||
my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
|
||||
|
||||
# now use the walsender protocol to peek the slot changes and make sure we see
|
||||
# the same results.
|
||||
|
||||
$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
|
||||
$endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1');
|
||||
|
||||
# walsender likes to add a newline
|
||||
chomp($stdout);
|
||||
is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
|
||||
|
||||
# We don't need the standby anymore
|
||||
$node_replica->teardown_node();
|
Loading…
x
Reference in New Issue
Block a user