diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b2b9fcbebb0..28c07d37c17 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -753,6 +757,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } } +/* + * Determine which timeline to read an xlog page from and set the + * XLogReaderState's currTLI to that timeline ID. + * + * We care about timelines in xlogreader when we might be reading xlog + * generated prior to a promotion, either if we're currently a standby in + * recovery or if we're a promoted master reading xlogs generated by the old + * master before our promotion. + * + * wantPage must be set to the start address of the page to read and + * wantLength to the amount of the page that will be read, up to + * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * We can't just check the timeline when we read a page on a different segment + * to the last page. We could've received a timeline switch from a cascading + * upstream, so the current segment ends apruptly (possibly getting renamed to + * .partial) and we have to switch to a new one. Even in the middle of reading + * a page we could have to dump the cached page and switch to a new TLI. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + * + * The caller must also make sure it doesn't read past the current replay + * position (using GetWalRcvWriteRecPtr) if executing in recovery, so it + * doesn't fail to notice that the current timeline became historical. The + * caller must also update ThisTimeLineID with the result of + * GetWalRcvWriteRecPtr and must check RecoveryInProgress(). + */ +void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); + Assert(wantLength <= XLOG_BLCKSZ); + Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + + /* + * If the desired page is currently read in and valid, we have nothing to do. + * + * The caller should've ensured that it didn't previously advance readOff + * past the valid limit of this timeline, so it doesn't matter if the current + * TLI has since become historical. + */ + if (lastReadPage == wantPage && + state->readLen != 0 && + lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1)) + return; + + /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + * + * ThisTimeLineID might've become historical since we last looked, but the + * caller is required not to read past the flush limit it saw at the time + * it looked up the timeline. There's nothing we can do about it if + * StartupXLOG() renames it to .partial concurrently. + */ + if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + return; + } + + /* + * If we're just reading pages from a previously validated historical + * timeline and the timeline we're reading from is valid until the + * end of the current segment we can just keep reading. + */ + if (state->currTLIValidUntil != InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0 && + (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize) + return; + + /* + * If we reach this point we're either looking up a page for random access, + * the current timeline just became historical, or we're reading from a new + * segment containing a timeline switch. In all cases we need to determine + * the newest timeline on the segment. + * + * If it's the current timeline we can just keep reading from here unless + * we detect a timeline switch that makes the current timeline historical. + * If it's a historical timeline we can read all the segment on the newest + * timeline because it contains all the old timelines' data too. So only + * one switch check is required. + */ + { + /* + * We need to re-read the timeline history in case it's been changed + * by a promotion or replay from a cascaded replica. + */ + List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + + XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1; + + Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize); + + /* Find the timeline of the last LSN on the segment containing wantPage. */ + state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory); + state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, + &state->nextTLI); + + Assert(state->currTLIValidUntil == InvalidXLogRecPtr || + wantPage + wantLength < state->currTLIValidUntil); + + list_free_deep(timelineHistory); + + elog(DEBUG3, "switched to timeline %u valid until %X/%X", + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + } +} + /* * read_page callback for reading local xlog files * @@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; + + /* Loop waiting for xlog to be available if necessary */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Determine the limit of xlog we can currently read to, and what the + * most recent timeline is. + * + * RecoveryInProgress() will update ThisTimeLineID when it first + * notices recovery finishes, so we only have to maintain it for the + * local process until recovery ends. */ if (!RecoveryInProgress()) - { - *pageTLI = ThisTimeLineID; read_upto = GetFlushRecPtr(); + else + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + + *pageTLI = ThisTimeLineID; + + /* + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() because + * in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding session on C, and B gets promoted, our + * timeline will change while we remain in recovery. + * + * We can't just keep reading from the old timeline as the last WAL + * archive in the timeline will get renamed to .partial by StartupXLOG(). + * + * If that happens after our caller updated ThisTimeLineID but before + * we actually read the xlog page, we might still try to read from the + * old (now renamed) segment and fail. There's not much we can do about + * this, but it can only happen when we're a leaf of a cascading + * standby whose master gets promoted while we're decoding, so a + * one-off ERROR isn't too bad. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + + if (state->currTLI == ThisTimeLineID) + { + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - read_upto = GetXLogReplayRecPtr(pageTLI); + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; - if (loc <= read_upto) + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } if (targetPagePtr + XLOG_BLCKSZ <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 41c50005d7f..c251b92f57b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ + /* + * Compute the current end-of-wal and maintain ThisTimeLineID. + * RecoveryInProgress() will update ThisTimeLineID on promotion. + */ if (!RecoveryInProgress()) end_of_wal = GetFlushRecPtr(); else - end_of_wal = GetXLogReplayRecPtr(NULL); + end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); ReplicationSlotAcquire(NameStr(*name)); @@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f6b828336f..75617709ecf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,6 +48,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 663d3e7890b..a1beeb54965 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -161,6 +161,22 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Safe point to read to in currTLI if current TLI is historical + * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline. + * + * Actually set to the start of the segment containing the timeline + * switch that ends currTLI's validity, not the LSN of the switch + * its self, since we can't assume the old segment will be present. + */ + XLogRecPtr currTLIValidUntil; + /* + * If currTLI is not the most recent known timeline, the next timeline to + * read from when currTLIValidUntil is reached. + */ + TimeLineID nextTLI; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 567a7f3d871..25a99422c1a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state, + XLogRecPtr wantPage, uint32 wantLength); + #endif diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 9d03d337d54..142a1b8de2e 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,6 +9,8 @@ # #------------------------------------------------------------------------- +EXTRA_INSTALL=contrib/test_decoding + subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl new file mode 100644 index 00000000000..09830dc39ce --- /dev/null +++ b/src/test/recovery/t/010_logical_decoding_timelines.pl @@ -0,0 +1,130 @@ +# Demonstrate that logical can follow timeline switches. +# +# Logical replication slots can follow timeline switches but it's +# normally not possible to have a logical slot on a replica where +# promotion and a timeline switch can occur. The only ways +# we can create that circumstance are: +# +# * By doing a filesystem-level copy of the DB, since pg_basebackup +# excludes pg_replslot but we can copy it directly; or +# +# * by creating a slot directly at the C level on the replica and +# advancing it as we go using the low level APIs. It can't be done +# from SQL since logical decoding isn't allowed on replicas. +# +# This module uses the first approach to show that timeline following +# on a logical slot works. +# +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 7; +use RecursiveCopy; +use File::Copy; +use IPC::Run (); +use Scalar::Util qw(blessed); + +my ($stdout, $stderr, $ret); + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); +$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); +$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->dump_info; +$node_master->start; + +diag "Testing logical timeline following with a filesystem-level copy"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my $backup_name = 'b1'; +$node_master->backup_fs_hot($backup_name); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +# Verify that only the before base_backup slot is on the replica +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, 'before_basebackup', + 'Expected to find only slot before_basebackup on replica'); + +# Boom, crash +$node_master->stop('immediate'); + +$node_replica->promote; +$node_replica->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +# Shouldn't be able to read from slot created after base backup +($ret, $stdout, $stderr) = $node_replica->psql('postgres', +"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" +); +is($ret, 3, 'replaying from after_basebackup slot fails'); +like( + $stderr, + qr/replication slot "after_basebackup" does not exist/, + 'after_basebackup slot missing'); + +# Should be able to read from slot created before base backup +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot before_basebackup succeeds'); + +my $final_expected_output_bb = q(BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT); +is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup'); +is($stderr, '', 'replay from slot before_basebackup produces no stderr'); + +# So far we've peeked the slots, so when we fetch the same info over +# pg_recvlogical we should get complete results. First, find out the commit lsn +# of the last transaction. There's no max(pg_lsn), so: + +my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;"); + +# now use the walsender protocol to peek the slot changes and make sure we see +# the same results. + +$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup', + $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1'); + +# walsender likes to add a newline +chomp($stdout); +is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup'); + +# We don't need the standby anymore +$node_replica->teardown_node();