mirror of
https://github.com/postgres/postgres.git
synced 2025-06-25 01:02:05 +03:00
Revert timeline following in replication slots
This reverts commitsf07d18b6e9
,82c83b3372
,3a3b309041
, and24c5f1a103
. This feature has shown enough immaturity that it was deemed better to rip it out before rushing some more fixes at the last minute. There are discussions on larger changes in this area for the next release.
This commit is contained in:
@ -14,7 +14,6 @@ SUBDIRS = \
|
||||
test_parser \
|
||||
test_rls_hooks \
|
||||
test_shm_mq \
|
||||
test_slot_timelines \
|
||||
worker_spi
|
||||
|
||||
all: submake-errcodes
|
||||
|
@ -1,3 +0,0 @@
|
||||
results/
|
||||
tmp_check/
|
||||
log/
|
@ -1,22 +0,0 @@
|
||||
# src/test/modules/test_slot_timelines/Makefile
|
||||
|
||||
MODULES = test_slot_timelines
|
||||
PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
|
||||
|
||||
EXTENSION = test_slot_timelines
|
||||
DATA = test_slot_timelines--1.0.sql
|
||||
|
||||
EXTRA_INSTALL=contrib/test_decoding
|
||||
REGRESS=load_extension
|
||||
REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
|
||||
|
||||
ifdef USE_PGXS
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
else
|
||||
subdir = src/test/modules/test_slot_timelines
|
||||
top_builddir = ../../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
include $(top_srcdir)/contrib/contrib-global.mk
|
||||
endif
|
@ -1,19 +0,0 @@
|
||||
A test module for logical decoding failover and timeline following.
|
||||
|
||||
This module provides a minimal way to maintain logical slots on replicas
|
||||
that mirror the state on the master. It doesn't make decoding possible,
|
||||
just tracking slot state so that a decoding client that's using the master
|
||||
can follow a physical failover to the standby. The master doesn't know
|
||||
about the slots on the standby, they're synced by a client that connects
|
||||
to both.
|
||||
|
||||
This is intentionally not part of the test_decoding module because that's meant
|
||||
to serve as example code, where this module exercises internal server features
|
||||
by unsafely exposing internal state to SQL. It's not the right way to do
|
||||
failover, it's just a simple way to test it from the perl TAP framework to
|
||||
prove the feature works.
|
||||
|
||||
In a practical implementation of this approach a bgworker on the master would
|
||||
monitor slot positions and relay them to a bgworker on the standby that applies
|
||||
the position updates without exposing slot internals to SQL. That's too complex
|
||||
for this test framework though.
|
@ -1,19 +0,0 @@
|
||||
CREATE EXTENSION test_slot_timelines;
|
||||
SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
|
||||
test_slot_timelines_create_logical_slot
|
||||
-----------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
|
||||
test_slot_timelines_advance_logical_slot
|
||||
------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT pg_drop_replication_slot('test_slot');
|
||||
pg_drop_replication_slot
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
@ -1,7 +0,0 @@
|
||||
CREATE EXTENSION test_slot_timelines;
|
||||
SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
|
||||
ERROR: replication slots can only be used if max_replication_slots > 0
|
||||
SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
|
||||
ERROR: replication slots can only be used if max_replication_slots > 0
|
||||
SELECT pg_drop_replication_slot('test_slot');
|
||||
ERROR: replication slots can only be used if max_replication_slots > 0
|
@ -1,7 +0,0 @@
|
||||
CREATE EXTENSION test_slot_timelines;
|
||||
|
||||
SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
|
||||
|
||||
SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location());
|
||||
|
||||
SELECT pg_drop_replication_slot('test_slot');
|
@ -1,16 +0,0 @@
|
||||
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
|
||||
|
||||
CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
|
||||
RETURNS void
|
||||
STRICT LANGUAGE c AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
|
||||
IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
|
||||
|
||||
CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
|
||||
RETURNS void
|
||||
STRICT LANGUAGE c AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn)
|
||||
IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
|
@ -1,133 +0,0 @@
|
||||
/*--------------------------------------------------------------------------
|
||||
*
|
||||
* test_slot_timelines.c
|
||||
* Test harness code for slot timeline following
|
||||
*
|
||||
* Copyright (c) 2016, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/test/modules/test_slot_timelines/test_slot_timelines.c
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/transam.h"
|
||||
#include "fmgr.h"
|
||||
#include "miscadmin.h"
|
||||
#include "replication/slot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
|
||||
PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
|
||||
|
||||
static void clear_slot_transient_state(void);
|
||||
|
||||
/*
|
||||
* Create a new logical slot, with invalid LSN and xid, directly. This does not
|
||||
* use the snapshot builder or logical decoding machinery. It's only intended
|
||||
* for creating a slot on a replica that mirrors the state of a slot on an
|
||||
* upstream master.
|
||||
*
|
||||
* Note that this is test harness code. You shouldn't expose slot internals
|
||||
* to SQL like this for any real world usage. See the README.
|
||||
*/
|
||||
Datum
|
||||
test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
|
||||
char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
|
||||
|
||||
CheckSlotRequirements();
|
||||
|
||||
ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
|
||||
|
||||
/* register the plugin name with the slot */
|
||||
StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
|
||||
|
||||
/*
|
||||
* Initialize persistent state to placeholders to be set by
|
||||
* test_slot_timelines_advance_logical_slot .
|
||||
*/
|
||||
MyReplicationSlot->data.xmin = InvalidTransactionId;
|
||||
MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
|
||||
MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
|
||||
MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
|
||||
|
||||
clear_slot_transient_state();
|
||||
|
||||
ReplicationSlotRelease();
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* Set the state of a slot.
|
||||
*
|
||||
* This doesn't maintain the non-persistent state at all,
|
||||
* but since the slot isn't in use that's OK.
|
||||
*
|
||||
* There's intentionally no check to prevent slots going backwards
|
||||
* because they can actually go backwards if the master crashes when
|
||||
* it hasn't yet flushed slot state to disk then we copy the older
|
||||
* slot state after recovery.
|
||||
*
|
||||
* There's no checking done for xmin or catalog xmin either, since
|
||||
* we can't really do anything useful that accounts for xid wrap-around.
|
||||
*
|
||||
* Note that this is test harness code. You shouldn't expose slot internals
|
||||
* to SQL like this for any real world usage. See the README.
|
||||
*/
|
||||
Datum
|
||||
test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
|
||||
TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1));
|
||||
TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2));
|
||||
XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
|
||||
XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
|
||||
|
||||
CheckSlotRequirements();
|
||||
|
||||
ReplicationSlotAcquire(slotname);
|
||||
|
||||
if (MyReplicationSlot->data.database != MyDatabaseId)
|
||||
elog(ERROR, "trying to update a slot on a different database");
|
||||
|
||||
MyReplicationSlot->data.xmin = new_xmin;
|
||||
MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
|
||||
MyReplicationSlot->data.restart_lsn = restart_lsn;
|
||||
MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
|
||||
|
||||
clear_slot_transient_state();
|
||||
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
ReplicationSlotRelease();
|
||||
|
||||
ReplicationSlotsComputeRequiredXmin(false);
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
static void
|
||||
clear_slot_transient_state(void)
|
||||
{
|
||||
Assert(MyReplicationSlot != NULL);
|
||||
|
||||
/*
|
||||
* Make sure the slot state is the same as if it were newly loaded from
|
||||
* disk on recovery.
|
||||
*/
|
||||
MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
|
||||
MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
|
||||
|
||||
MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
|
||||
MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
|
||||
MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
|
||||
MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
max_replication_slots=2
|
||||
wal_level=logical
|
@ -1,5 +0,0 @@
|
||||
# test_slot_timelines extension
|
||||
comment = 'Test utility for slot timeline following and logical decoding'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/test_slot_timelines'
|
||||
relocatable = true
|
@ -9,8 +9,6 @@
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
|
||||
|
||||
subdir = src/test/recovery
|
||||
top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
@ -1,307 +0,0 @@
|
||||
# Demonstrate that logical can follow timeline switches.
|
||||
#
|
||||
# Logical replication slots can follow timeline switches but it's
|
||||
# normally not possible to have a logical slot on a replica where
|
||||
# promotion and a timeline switch can occur. The only ways
|
||||
# we can create that circumstance are:
|
||||
#
|
||||
# * By doing a filesystem-level copy of the DB, since pg_basebackup
|
||||
# excludes pg_replslot but we can copy it directly; or
|
||||
#
|
||||
# * by creating a slot directly at the C level on the replica and
|
||||
# advancing it as we go using the low level APIs. It can't be done
|
||||
# from SQL since logical decoding isn't allowed on replicas.
|
||||
#
|
||||
# This module uses the first approach to show that timeline following
|
||||
# on a logical slot works.
|
||||
#
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 20;
|
||||
use RecursiveCopy;
|
||||
use File::Copy;
|
||||
|
||||
my ($stdout, $stderr, $ret);
|
||||
|
||||
# Initialize master node
|
||||
my $node_master = get_new_node('master');
|
||||
$node_master->init(allows_streaming => 1, has_archiving => 1);
|
||||
$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
|
||||
$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
|
||||
$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
|
||||
$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
|
||||
$node_master->dump_info;
|
||||
$node_master->start;
|
||||
|
||||
diag "Testing logical timeline following with a filesystem-level copy";
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
|
||||
);
|
||||
$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('beforebb');");
|
||||
$node_master->safe_psql('postgres', 'CHECKPOINT;');
|
||||
|
||||
my $backup_name = 'b1';
|
||||
$node_master->backup_fs_hot($backup_name);
|
||||
|
||||
my $node_replica = get_new_node('replica');
|
||||
$node_replica->init_from_backup(
|
||||
$node_master, $backup_name,
|
||||
has_streaming => 1,
|
||||
has_restoring => 1);
|
||||
$node_replica->start;
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
|
||||
);
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('afterbb');");
|
||||
$node_master->safe_psql('postgres', 'CHECKPOINT;');
|
||||
|
||||
# Verify that only the before base_backup slot is on the replica
|
||||
$stdout = $node_replica->safe_psql('postgres',
|
||||
'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
|
||||
is($stdout, 'before_basebackup',
|
||||
'Expected to find only slot before_basebackup on replica');
|
||||
|
||||
# Boom, crash
|
||||
$node_master->stop('immediate');
|
||||
|
||||
$node_replica->promote;
|
||||
$node_replica->poll_query_until('postgres',
|
||||
"SELECT NOT pg_is_in_recovery();");
|
||||
|
||||
$node_replica->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('after failover');");
|
||||
|
||||
# Shouldn't be able to read from slot created after base backup
|
||||
($ret, $stdout, $stderr) = $node_replica->psql('postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
|
||||
);
|
||||
is($ret, 3, 'replaying from after_basebackup slot fails');
|
||||
like(
|
||||
$stderr,
|
||||
qr/replication slot "after_basebackup" does not exist/,
|
||||
'after_basebackup slot missing');
|
||||
|
||||
# Should be able to read from slot created before base backup
|
||||
($ret, $stdout, $stderr) = $node_replica->psql(
|
||||
'postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
|
||||
timeout => 30);
|
||||
is($ret, 0, 'replay from slot before_basebackup succeeds');
|
||||
is( $stdout, q(BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'beforebb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'afterbb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'after failover'
|
||||
COMMIT), 'decoded expected data from slot before_basebackup');
|
||||
is($stderr, '', 'replay from slot before_basebackup produces no stderr');
|
||||
|
||||
# We don't need the standby anymore
|
||||
$node_replica->teardown_node();
|
||||
|
||||
|
||||
# OK, time to try the same thing again, but this time we'll be using slot
|
||||
# mirroring on the standby and a pg_basebackup of the master.
|
||||
|
||||
diag "Testing logical timeline following with test_slot_timelines module";
|
||||
|
||||
$node_master->start();
|
||||
|
||||
# Clean up after the last test
|
||||
$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
|
||||
is( $node_master->psql(
|
||||
'postgres',
|
||||
'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
|
||||
0,
|
||||
'dropping slots succeeds via pg_drop_replication_slot');
|
||||
|
||||
# Same as before, we'll make one slot before basebackup, one after. This time
|
||||
# the basebackup will be with pg_basebackup so it'll omit both slots, then
|
||||
# we'll use SQL functions provided by the test_slot_timelines test module to sync
|
||||
# them to the replica, do some work, sync them and fail over then test again.
|
||||
# This time we should have both the before- and after-basebackup slots working.
|
||||
|
||||
is( $node_master->psql(
|
||||
'postgres',
|
||||
"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
|
||||
),
|
||||
0,
|
||||
'creating slot before_basebackup succeeds');
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('beforebb');");
|
||||
|
||||
$backup_name = 'b2';
|
||||
$node_master->backup($backup_name);
|
||||
|
||||
is( $node_master->psql(
|
||||
'postgres',
|
||||
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
|
||||
),
|
||||
0,
|
||||
'creating slot after_basebackup succeeds');
|
||||
|
||||
$node_master->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('afterbb');");
|
||||
|
||||
$node_replica = get_new_node('replica2');
|
||||
$node_replica->init_from_backup(
|
||||
$node_master, $backup_name,
|
||||
has_streaming => 1,
|
||||
has_restoring => 1);
|
||||
|
||||
$node_replica->start;
|
||||
|
||||
# Verify the slots are both absent on the replica
|
||||
$stdout = $node_replica->safe_psql('postgres',
|
||||
'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
|
||||
is($stdout, '', 'No slots exist on the replica');
|
||||
|
||||
# Now do our magic to sync the slot states across. Normally
|
||||
# this would be being done continuously by a bgworker but
|
||||
# we're just doing it by hand for this test. This is exposing
|
||||
# postgres innards to SQL so it's unsafe except for testing.
|
||||
$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
|
||||
|
||||
my $slotinfo = $node_master->safe_psql(
|
||||
'postgres',
|
||||
qq{SELECT slot_name, plugin,
|
||||
COALESCE(xmin, '0'), catalog_xmin,
|
||||
restart_lsn, confirmed_flush_lsn
|
||||
FROM pg_replication_slots ORDER BY slot_name}
|
||||
);
|
||||
diag "Copying slots to replica";
|
||||
open my $fh, '<', \$slotinfo or die $!;
|
||||
while (<$fh>)
|
||||
{
|
||||
print $_;
|
||||
chomp $_;
|
||||
my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
|
||||
$confirmed_flush_lsn)
|
||||
= map { "'$_'" } split qr/\|/, $_;
|
||||
|
||||
print
|
||||
"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
|
||||
$node_replica->safe_psql('postgres',
|
||||
"SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
|
||||
);
|
||||
$node_replica->safe_psql('postgres',
|
||||
"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
|
||||
);
|
||||
}
|
||||
close $fh or die $!;
|
||||
|
||||
# Now both slots are present on the replica and exactly match the master
|
||||
$stdout = $node_replica->safe_psql('postgres',
|
||||
'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
|
||||
is( $stdout,
|
||||
"after_basebackup\nbefore_basebackup",
|
||||
'both slots now exist on replica');
|
||||
|
||||
$stdout = $node_replica->safe_psql(
|
||||
'postgres',
|
||||
qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin,
|
||||
restart_lsn, confirmed_flush_lsn
|
||||
FROM pg_replication_slots
|
||||
ORDER BY slot_name});
|
||||
is($stdout, $slotinfo,
|
||||
"slot data read back from replica matches slot data on master");
|
||||
|
||||
# We now have to copy some extra WAL to satisfy the requirements of the oldest
|
||||
# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
|
||||
# so we have to help out. We know the WAL is still retained on the master
|
||||
# because we haven't advanced the slots there.
|
||||
#
|
||||
# Figure out what the oldest segment we need is by looking at the restart_lsn
|
||||
# of the oldest slot.
|
||||
#
|
||||
# It only makes sense to do this once the slots are created on the replica,
|
||||
# otherwise it might just delete the segments again.
|
||||
|
||||
my $oldest_needed_segment = $node_master->safe_psql(
|
||||
'postgres',
|
||||
qq{SELECT pg_xlogfile_name((
|
||||
SELECT restart_lsn
|
||||
FROM pg_replication_slots
|
||||
ORDER BY restart_lsn ASC
|
||||
LIMIT 1
|
||||
));}
|
||||
);
|
||||
|
||||
diag "oldest needed xlog seg is $oldest_needed_segment ";
|
||||
|
||||
# WAL segment names sort lexically so we can just grab everything > than this
|
||||
# segment.
|
||||
opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
|
||||
while (my $seg = readdir $pg_xlog)
|
||||
{
|
||||
next if $seg eq '.' or $seg eq '..';
|
||||
next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
|
||||
diag "copying xlog seg $seg";
|
||||
copy(
|
||||
$node_master->data_dir . "/pg_xlog/" . $seg,
|
||||
$node_replica->data_dir . "/pg_xlog/" . $seg
|
||||
) or die "copy of xlog seg $seg failed: $!";
|
||||
}
|
||||
closedir $pg_xlog;
|
||||
|
||||
# Boom, crash the master
|
||||
$node_master->stop('immediate');
|
||||
|
||||
$node_replica->promote;
|
||||
$node_replica->poll_query_until('postgres',
|
||||
"SELECT NOT pg_is_in_recovery();");
|
||||
|
||||
$node_replica->safe_psql('postgres',
|
||||
"INSERT INTO decoding(blah) VALUES ('after failover');");
|
||||
|
||||
# This time we can read from both slots
|
||||
($ret, $stdout, $stderr) = $node_replica->psql(
|
||||
'postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
|
||||
timeout => 30);
|
||||
is($ret, 0, 'replay from slot after_basebackup succeeds');
|
||||
is( $stdout, q(BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'afterbb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'after failover'
|
||||
COMMIT), 'decoded expected data from slot after_basebackup');
|
||||
is($stderr, '', 'replay from slot after_basebackup produces no stderr');
|
||||
|
||||
# Should be able to read from slot created before base backup
|
||||
#
|
||||
# This would fail with an error about missing WAL segments if we hadn't
|
||||
# copied extra WAL earlier.
|
||||
($ret, $stdout, $stderr) = $node_replica->psql(
|
||||
'postgres',
|
||||
"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
|
||||
timeout => 30);
|
||||
is($ret, 0, 'replay from slot before_basebackup succeeds');
|
||||
is( $stdout, q(BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'beforebb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'afterbb'
|
||||
COMMIT
|
||||
BEGIN
|
||||
table public.decoding: INSERT: blah[text]:'after failover'
|
||||
COMMIT), 'decoded expected data from slot before_basebackup');
|
||||
is($stderr, '', 'replay from slot before_basebackup produces no stderr');
|
||||
|
||||
($ret, $stdout, $stderr) = $node_replica->psql('postgres',
|
||||
'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
|
||||
is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
|
||||
is($stderr, '', 'dropping slots produces no stderr output');
|
||||
|
||||
1;
|
Reference in New Issue
Block a user