1
0
mirror of https://github.com/postgres/postgres.git synced 2026-01-26 09:41:40 +03:00

Add test for pg_recvlogical reconnection behavior.

This commit adds a test to verify that data already received and flushed by
pg_recvlogical is not streamed again even after the connection is lost,
reestablished, and logical replication is restarted.

Author: Mircea Cadariu <cadariu.mircea@gmail.com>
Co-authored-by: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAHGQGwFeTymZQ7RLvMU6WuDGar8bUQCazg=VOfA-9GeBkg-FzA@mail.gmail.com
This commit is contained in:
Fujii Masao
2026-01-16 12:36:34 +09:00
parent 0b10969db6
commit d89b1d8175

View File

@@ -6,6 +6,7 @@ use warnings FATAL => 'all';
use PostgreSQL::Test::Utils;
use PostgreSQL::Test::Cluster;
use Test::More;
use Config;
program_help_ok('pg_recvlogical');
program_version_ok('pg_recvlogical');
@@ -151,4 +152,97 @@ my $result = $node->safe_psql('postgres',
);
is($result, 't', "failover is enabled for the new slot");
# Test that when pg_recvlogical reconnects, it does not write duplicate
# records to the output file
my $outfile = $node->basedir . '/reconnect.out';
$node->command_ok(
[
'pg_recvlogical',
'--slot' => 'reconnect_test',
'--dbname' => $node->connstr('postgres'),
'--create-slot',
],
'slot created for reconnection test');
# Insert the first record for this test
$node->safe_psql('postgres', 'INSERT INTO test_table VALUES (1)');
my @pg_recvlogical_cmd = (
'pg_recvlogical',
'--slot' => 'reconnect_test',
'--dbname' => $node->connstr('postgres'),
'--start',
'--file' => $outfile,
'--fsync-interval' => '1',
'--status-interval' => '100',
'--verbose');
# On Windows, specify --endpos so pg_recvlogical can terminate, since
# signals cannot be used. Use the current LSN plus 32MB as endpos, which
# would be sufficient to cover the WAL generated by the test INSERTs.
if ($Config{osname} eq 'MSWin32')
{
$nextlsn = $node->safe_psql('postgres',
"SELECT pg_current_wal_insert_lsn() + pg_size_bytes('32MB')");
chomp($nextlsn);
push(@pg_recvlogical_cmd, '--endpos' => $nextlsn);
}
my ($stdout, $stderr);
my $recv = IPC::Run::start(
[@pg_recvlogical_cmd],
'>' => \$stdout,
'2>' => \$stderr);
# Wait for pg_recvlogical to receive and write the first INSERT
my $first_ins = wait_for_file($outfile, qr/INSERT/);
# Terminate the walsender to force pg_recvlogical to reconnect
my $backend_pid = $node->safe_psql('postgres',
"SELECT active_pid FROM pg_replication_slots WHERE slot_name = 'reconnect_test'"
);
$node->safe_psql('postgres', "SELECT pg_terminate_backend($backend_pid)");
# Wait for pg_recvlogical to reconnect
$node->poll_query_until('postgres',
"SELECT active_pid IS NOT NULL AND active_pid != $backend_pid FROM pg_replication_slots WHERE slot_name = 'reconnect_test'"
) or die "Timed out while waiting for pg_recvlogical to reconnect";
# Insert the second record for this test
$node->safe_psql('postgres', 'INSERT INTO test_table VALUES (2)');
# Wait for pg_recvlogical to receive and write the second INSERT
wait_for_file($outfile, qr/INSERT/, $first_ins);
# Terminate pg_recvlogical by generating WAL until the current position
# reaches the specified --endpos on Windows, or by sending a TERM signal
# on other platforms.
if ($Config{osname} eq 'MSWin32')
{
$node->poll_query_until('postgres',
"SELECT pg_switch_wal() >= '$nextlsn' FROM pg_logical_emit_message(false, 'test', 'test')"
) or die "Timed out while waiting for pg_recvlogical to end";
}
else
{
$recv->signal('TERM');
}
$recv->finish();
my $outfiledata = slurp_file("$outfile");
my $count = (() = $outfiledata =~ /INSERT/g);
cmp_ok($count, '==', 2,
'pg_recvlogical has received and written two INSERTs');
$node->command_ok(
[
'pg_recvlogical',
'--slot' => 'reconnect_test',
'--dbname' => $node->connstr('postgres'),
'--drop-slot'
],
'reconnect_test slot dropped');
done_testing();