mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	TAP test for logical decoding on standby
Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Amit Khandekar <amitdkhan.pg@gmail.com> Author: Craig Ringer <craig@2ndquadrant.com> (in an older version) Author: Andres Freund <andres@anarazel.de> Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
This commit is contained in:
		| @@ -3029,6 +3029,43 @@ $SIG{TERM} = $SIG{INT} = sub { | ||||
|  | ||||
| =pod | ||||
|  | ||||
| =item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname) | ||||
|  | ||||
| Create logical replication slot on given standby | ||||
|  | ||||
| =cut | ||||
|  | ||||
| sub create_logical_slot_on_standby | ||||
| { | ||||
| 	my ($self, $primary, $slot_name, $dbname) = @_; | ||||
| 	my ($stdout, $stderr); | ||||
|  | ||||
| 	my $handle; | ||||
|  | ||||
| 	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); | ||||
|  | ||||
| 	# Once the slot's restart_lsn is determined, the standby looks for | ||||
| 	# xl_running_xacts WAL record from the restart_lsn onwards. First wait | ||||
| 	# until the slot restart_lsn is determined. | ||||
|  | ||||
| 	$self->poll_query_until( | ||||
| 		'postgres', qq[ | ||||
| 		SELECT restart_lsn IS NOT NULL | ||||
| 		FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name' | ||||
| 	]) or die "timed out waiting for logical slot to calculate its restart_lsn"; | ||||
|  | ||||
| 	# Then arrange for the xl_running_xacts record for which pg_recvlogical is | ||||
| 	# waiting. | ||||
| 	$primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()'); | ||||
|  | ||||
| 	$handle->finish(); | ||||
|  | ||||
| 	is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created') | ||||
| 		or die "could not create slot" . $slot_name; | ||||
| } | ||||
|  | ||||
| =pod | ||||
|  | ||||
| =back | ||||
|  | ||||
| =cut | ||||
|   | ||||
| @@ -40,6 +40,7 @@ tests += { | ||||
|       't/032_relfilenode_reuse.pl', | ||||
|       't/033_replay_tsp_drops.pl', | ||||
|       't/034_create_database.pl', | ||||
|       't/035_standby_logical_decoding.pl', | ||||
|     ], | ||||
|   }, | ||||
| } | ||||
|   | ||||
							
								
								
									
										734
									
								
								src/test/recovery/t/035_standby_logical_decoding.pl
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										734
									
								
								src/test/recovery/t/035_standby_logical_decoding.pl
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,734 @@ | ||||
| # Copyright (c) 2023, PostgreSQL Global Development Group | ||||
|  | ||||
| # logical decoding on standby : test logical decoding, | ||||
| # recovery conflict and standby promotion. | ||||
|  | ||||
| use strict; | ||||
| use warnings; | ||||
|  | ||||
| use PostgreSQL::Test::Cluster; | ||||
| use PostgreSQL::Test::Utils; | ||||
| use Test::More; | ||||
|  | ||||
| my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot); | ||||
|  | ||||
| my $node_primary = PostgreSQL::Test::Cluster->new('primary'); | ||||
| my $node_standby = PostgreSQL::Test::Cluster->new('standby'); | ||||
| my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); | ||||
| my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; | ||||
| my $res; | ||||
|  | ||||
| # Name for the physical slot on primary | ||||
| my $primary_slotname = 'primary_physical'; | ||||
| my $standby_physical_slotname = 'standby_physical'; | ||||
|  | ||||
| # find $pat in logfile of $node after $off-th byte | ||||
| sub find_in_log | ||||
| { | ||||
| 	my ($node, $pat, $off) = @_; | ||||
|  | ||||
| 	$off = 0 unless defined $off; | ||||
| 	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile); | ||||
| 	return 0 if (length($log) <= $off); | ||||
|  | ||||
| 	$log = substr($log, $off); | ||||
|  | ||||
| 	return $log =~ m/$pat/; | ||||
| } | ||||
|  | ||||
| # Fetch xmin columns from slot's pg_replication_slots row, after waiting for | ||||
| # given boolean condition to be true to ensure we've reached a quiescent state. | ||||
| sub wait_for_xmins | ||||
| { | ||||
| 	my ($node, $slotname, $check_expr) = @_; | ||||
|  | ||||
| 	$node->poll_query_until( | ||||
| 		'postgres', qq[ | ||||
| 		SELECT $check_expr | ||||
| 		FROM pg_catalog.pg_replication_slots | ||||
| 		WHERE slot_name = '$slotname'; | ||||
| 	]) or die "Timed out waiting for slot xmins to advance"; | ||||
| } | ||||
|  | ||||
| # Create the required logical slots on standby. | ||||
| sub create_logical_slots | ||||
| { | ||||
| 	my ($node, $slot_prefix) = @_; | ||||
|  | ||||
| 	my $active_slot = $slot_prefix . 'activeslot'; | ||||
| 	my $inactive_slot = $slot_prefix . 'inactiveslot'; | ||||
| 	$node->create_logical_slot_on_standby($node_primary, qq($inactive_slot), 'testdb'); | ||||
| 	$node->create_logical_slot_on_standby($node_primary, qq($active_slot), 'testdb'); | ||||
| } | ||||
|  | ||||
| # Drop the logical slots on standby. | ||||
| sub drop_logical_slots | ||||
| { | ||||
| 	my ($slot_prefix) = @_; | ||||
| 	my $active_slot = $slot_prefix . 'activeslot'; | ||||
| 	my $inactive_slot = $slot_prefix . 'inactiveslot'; | ||||
|  | ||||
| 	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$inactive_slot')]); | ||||
| 	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$active_slot')]); | ||||
| } | ||||
|  | ||||
| # Acquire one of the standby logical slots created by create_logical_slots(). | ||||
| # In case wait is true we are waiting for an active pid on the 'activeslot' slot. | ||||
| # If wait is not true it means we are testing a known failure scenario. | ||||
| sub make_slot_active | ||||
| { | ||||
| 	my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_; | ||||
| 	my $slot_user_handle; | ||||
|  | ||||
| 	my $active_slot = $slot_prefix . 'activeslot'; | ||||
| 	$slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', qq($active_slot), '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr); | ||||
|  | ||||
| 	if ($wait) | ||||
| 	{ | ||||
| 		# make sure activeslot is in use | ||||
| 		$node->poll_query_until('testdb', | ||||
| 			qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)] | ||||
| 		) or die "slot never became active"; | ||||
| 	} | ||||
| 	return $slot_user_handle; | ||||
| } | ||||
|  | ||||
| # Check pg_recvlogical stderr | ||||
| sub check_pg_recvlogical_stderr | ||||
| { | ||||
| 	my ($slot_user_handle, $check_stderr) = @_; | ||||
| 	my $return; | ||||
|  | ||||
| 	# our client should've terminated in response to the walsender error | ||||
| 	$slot_user_handle->finish; | ||||
| 	$return = $?; | ||||
| 	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero"); | ||||
| 	if ($return) { | ||||
| 		like($stderr, qr/$check_stderr/, 'slot has been invalidated'); | ||||
| 	} | ||||
|  | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| # Check if all the slots on standby are dropped. These include the 'activeslot' | ||||
| # that was acquired by make_slot_active(), and the non-active 'inactiveslot'. | ||||
| sub check_slots_dropped | ||||
| { | ||||
| 	my ($slot_prefix, $slot_user_handle) = @_; | ||||
|  | ||||
| 	is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped'); | ||||
| 	is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); | ||||
|  | ||||
| 	check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery"); | ||||
| } | ||||
|  | ||||
| # Change hot_standby_feedback and check xmin and catalog_xmin values. | ||||
| sub change_hot_standby_feedback_and_wait_for_xmins | ||||
| { | ||||
| 	my ($hsf, $invalidated) = @_; | ||||
|  | ||||
| 	$node_standby->append_conf('postgresql.conf',qq[ | ||||
| 	hot_standby_feedback = $hsf | ||||
| 	]); | ||||
|  | ||||
| 	$node_standby->reload; | ||||
|  | ||||
| 	if ($hsf && $invalidated) | ||||
| 	{ | ||||
| 		# With hot_standby_feedback on, xmin should advance, | ||||
| 		# but catalog_xmin should still remain NULL since there is no logical slot. | ||||
| 		wait_for_xmins($node_primary, $primary_slotname, | ||||
| 			   "xmin IS NOT NULL AND catalog_xmin IS NULL"); | ||||
| 	} | ||||
| 	elsif ($hsf) | ||||
| 	{ | ||||
| 		# With hot_standby_feedback on, xmin and catalog_xmin should advance. | ||||
| 		wait_for_xmins($node_primary, $primary_slotname, | ||||
| 			   "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		# Both should be NULL since hs_feedback is off | ||||
| 		wait_for_xmins($node_primary, $primary_slotname, | ||||
| 			   "xmin IS NULL AND catalog_xmin IS NULL"); | ||||
|  | ||||
| 	} | ||||
| } | ||||
|  | ||||
| # Check conflicting status in pg_replication_slots. | ||||
| sub check_slots_conflicting_status | ||||
| { | ||||
| 	my ($conflicting) = @_; | ||||
|  | ||||
| 	if ($conflicting) | ||||
| 	{ | ||||
| 		$res = $node_standby->safe_psql( | ||||
| 				'postgres', qq( | ||||
| 				 select bool_and(conflicting) from pg_replication_slots;)); | ||||
|  | ||||
| 		is($res, 't', | ||||
| 			"Logical slots are reported as conflicting"); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		$res = $node_standby->safe_psql( | ||||
| 				'postgres', qq( | ||||
| 				select bool_or(conflicting) from pg_replication_slots;)); | ||||
|  | ||||
| 		is($res, 'f', | ||||
| 			"Logical slots are reported as non conflicting"); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| # Drop the slots, re-create them, change hot_standby_feedback, | ||||
| # check xmin and catalog_xmin values, make slot active and reset stat. | ||||
| sub reactive_slots_change_hfs_and_wait_for_xmins | ||||
| { | ||||
| 	my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_; | ||||
|  | ||||
| 	# drop the logical slots | ||||
| 	drop_logical_slots($previous_slot_prefix); | ||||
|  | ||||
| 	# create the logical slots | ||||
| 	create_logical_slots($node_standby, $slot_prefix); | ||||
|  | ||||
| 	change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated); | ||||
|  | ||||
| 	$handle = make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr); | ||||
|  | ||||
| 	# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts | ||||
| 	$node_standby->psql('testdb', q[select pg_stat_reset();]); | ||||
| } | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| sub check_for_invalidation | ||||
| { | ||||
| 	my ($slot_prefix, $log_start, $test_name) = @_; | ||||
|  | ||||
| 	my $active_slot = $slot_prefix . 'activeslot'; | ||||
| 	my $inactive_slot = $slot_prefix . 'inactiveslot'; | ||||
|  | ||||
| 	# message should be issued | ||||
| 	ok( find_in_log( | ||||
| 		$node_standby, | ||||
| 		"invalidating obsolete replication slot \"$inactive_slot\"", $log_start), | ||||
| 		"inactiveslot slot invalidation is logged $test_name"); | ||||
|  | ||||
| 	ok( find_in_log( | ||||
| 		$node_standby, | ||||
| 		"invalidating obsolete replication slot \"$active_slot\"", $log_start), | ||||
| 		"activeslot slot invalidation is logged $test_name"); | ||||
|  | ||||
| 	# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated | ||||
| 	ok( $node_standby->poll_query_until( | ||||
| 		'postgres', | ||||
| 		"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'), | ||||
| 		'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; | ||||
| } | ||||
|  | ||||
| ######################## | ||||
| # Initialize primary node | ||||
| ######################## | ||||
|  | ||||
| $node_primary->init(allows_streaming => 1, has_archiving => 1); | ||||
| $node_primary->append_conf('postgresql.conf', q{ | ||||
| wal_level = 'logical' | ||||
| max_replication_slots = 4 | ||||
| max_wal_senders = 4 | ||||
| log_min_messages = 'debug2' | ||||
| log_error_verbosity = verbose | ||||
| }); | ||||
| $node_primary->dump_info; | ||||
| $node_primary->start; | ||||
|  | ||||
| $node_primary->psql('postgres', q[CREATE DATABASE testdb]); | ||||
|  | ||||
| $node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]); | ||||
|  | ||||
| # Check conflicting is NULL for physical slot | ||||
| $res = $node_primary->safe_psql( | ||||
| 		'postgres', qq[ | ||||
| 		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]); | ||||
|  | ||||
| is($res, 't', | ||||
| 	"Physical slot reports conflicting as NULL"); | ||||
|  | ||||
| my $backup_name = 'b1'; | ||||
| $node_primary->backup($backup_name); | ||||
|  | ||||
| # Some tests need to wait for VACUUM to be replayed. But vacuum does not flush | ||||
| # WAL. An insert into flush_wal outside transaction does guarantee a flush. | ||||
| $node_primary->psql('testdb', q[CREATE TABLE flush_wal();]); | ||||
|  | ||||
| ####################### | ||||
| # Initialize standby node | ||||
| ####################### | ||||
|  | ||||
| $node_standby->init_from_backup( | ||||
| 	$node_primary, $backup_name, | ||||
| 	has_streaming => 1, | ||||
| 	has_restoring => 1); | ||||
| $node_standby->append_conf('postgresql.conf', | ||||
| 	qq[primary_slot_name = '$primary_slotname']); | ||||
| $node_standby->start; | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
| $node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]); | ||||
|  | ||||
| ####################### | ||||
| # Initialize cascading standby node | ||||
| ####################### | ||||
| $node_standby->backup($backup_name); | ||||
| $node_cascading_standby->init_from_backup( | ||||
| 	$node_standby, $backup_name, | ||||
| 	has_streaming => 1, | ||||
| 	has_restoring => 1); | ||||
| $node_cascading_standby->append_conf('postgresql.conf', | ||||
| 	qq[primary_slot_name = '$standby_physical_slotname']); | ||||
| $node_cascading_standby->start; | ||||
| $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); | ||||
|  | ||||
| ################################################## | ||||
| # Test that logical decoding on the standby | ||||
| # behaves correctly. | ||||
| ################################################## | ||||
|  | ||||
| # create the logical slots | ||||
| create_logical_slots($node_standby, 'behaves_ok_'); | ||||
|  | ||||
| $node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); | ||||
| $node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| my $result = $node_standby->safe_psql('testdb', | ||||
| 	qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]); | ||||
|  | ||||
| # test if basic decoding works | ||||
| is(scalar(my @foobar = split /^/m, $result), | ||||
| 	14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)'); | ||||
|  | ||||
| # Insert some rows and verify that we get the same results from pg_recvlogical | ||||
| # and the SQL interface. | ||||
| $node_primary->safe_psql('testdb', | ||||
| 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] | ||||
| ); | ||||
|  | ||||
| my $expected = q{BEGIN | ||||
| table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' | ||||
| table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' | ||||
| table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' | ||||
| table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' | ||||
| COMMIT}; | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| my $stdout_sql = $node_standby->safe_psql('testdb', | ||||
| 	qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] | ||||
| ); | ||||
|  | ||||
| is($stdout_sql, $expected, 'got expected output from SQL decoding session'); | ||||
|  | ||||
| my $endpos = $node_standby->safe_psql('testdb', | ||||
| 	"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" | ||||
| ); | ||||
|  | ||||
| # Insert some rows after $endpos, which we won't read. | ||||
| $node_primary->safe_psql('testdb', | ||||
| 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;] | ||||
| ); | ||||
|  | ||||
| $node_primary->wait_for_catchup($node_standby); | ||||
|  | ||||
| my $stdout_recv = $node_standby->pg_recvlogical_upto( | ||||
|     'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout, | ||||
|     'include-xids'     => '0', | ||||
|     'skip-empty-xacts' => '1'); | ||||
| chomp($stdout_recv); | ||||
| is($stdout_recv, $expected, | ||||
|     'got same expected output from pg_recvlogical decoding session'); | ||||
|  | ||||
| $node_standby->poll_query_until('testdb', | ||||
| 	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)" | ||||
| ) or die "slot never became inactive"; | ||||
|  | ||||
| $stdout_recv = $node_standby->pg_recvlogical_upto( | ||||
|     'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout, | ||||
|     'include-xids'     => '0', | ||||
|     'skip-empty-xacts' => '1'); | ||||
| chomp($stdout_recv); | ||||
| is($stdout_recv, '', 'pg_recvlogical acknowledged changes'); | ||||
|  | ||||
| $node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb'); | ||||
|  | ||||
| is( $node_primary->psql( | ||||
|         'otherdb', | ||||
|         "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" | ||||
|     ), | ||||
|     3, | ||||
|     'replaying logical slot from another database fails'); | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Invalidate conflicting slots, including in-use slots | ||||
| # Scenario 1: hot_standby_feedback off and vacuum FULL | ||||
| ################################################## | ||||
|  | ||||
| # One way to produce recovery conflict is to create/drop a relation and | ||||
| # launch a vacuum full on pg_class with hot_standby_feedback turned off on | ||||
| # the standby. | ||||
| reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_', 0, 1); | ||||
|  | ||||
| # This should trigger the conflict | ||||
| $node_primary->safe_psql('testdb', qq[ | ||||
|   CREATE TABLE conflict_test(x integer, y text); | ||||
|   DROP TABLE conflict_test; | ||||
|   VACUUM full pg_class; | ||||
|   INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal | ||||
| ]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class'); | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr); | ||||
|  | ||||
| # We are not able to read from the slot as it has been invalidated | ||||
| check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"vacuum_full_activeslot\""); | ||||
|  | ||||
| # Turn hot_standby_feedback back on | ||||
| change_hot_standby_feedback_and_wait_for_xmins(1,1); | ||||
|  | ||||
| ################################################## | ||||
| # Verify that invalidated logical slots stay invalidated across a restart. | ||||
| ################################################## | ||||
| $node_standby->restart; | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| ################################################## | ||||
| # Verify that invalidated logical slots do not lead to retaining WAL | ||||
| ################################################## | ||||
| # XXXXX TODO | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Invalidate conflicting slots, including in-use slots | ||||
| # Scenario 2: conflict due to row removal with hot_standby_feedback off. | ||||
| ################################################## | ||||
|  | ||||
| # get the position to search from in the standby logfile | ||||
| my $logstart = -s $node_standby->logfile; | ||||
|  | ||||
| # One way to produce recovery conflict is to create/drop a relation and | ||||
| # launch a vacuum on pg_class with hot_standby_feedback turned off on the standby. | ||||
| reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_', 0, 1); | ||||
|  | ||||
| # This should trigger the conflict | ||||
| $node_primary->safe_psql('testdb', qq[ | ||||
|   CREATE TABLE conflict_test(x integer, y text); | ||||
|   DROP TABLE conflict_test; | ||||
|   VACUUM pg_class; | ||||
|   INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal | ||||
| ]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class'); | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr); | ||||
|  | ||||
| # We are not able to read from the slot as it has been invalidated | ||||
| check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"row_removal_activeslot\""); | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Same as Scenario 2 but on a shared catalog table | ||||
| # Scenario 3: conflict due to row removal with hot_standby_feedback off. | ||||
| ################################################## | ||||
|  | ||||
| # get the position to search from in the standby logfile | ||||
| $logstart = -s $node_standby->logfile; | ||||
|  | ||||
| # One way to produce recovery conflict is to create/drop a relation and | ||||
| # launch a vacuum on pg_class with hot_standby_feedback turned off on the standby. | ||||
| reactive_slots_change_hfs_and_wait_for_xmins('row_removal_', 'shared_row_removal_', 0, 1); | ||||
|  | ||||
| # Trigger the conflict | ||||
| diag $node_primary->safe_psql('testdb', qq[ | ||||
|   CREATE ROLE create_trash; | ||||
|   DROP ROLE create_trash; | ||||
|   VACUUM pg_authid; | ||||
|   INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal | ||||
| ]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| check_for_invalidation('shared_row_removal_', $logstart, 'with vacuum on pg_authid'); | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, \$stderr); | ||||
|  | ||||
| # We are not able to read from the slot as it has been invalidated | ||||
| check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"shared_row_removal_activeslot\""); | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Same as Scenario 2 but on a non catalog table | ||||
| # Scenario 4: No conflict expected. | ||||
| ################################################## | ||||
|  | ||||
| # get the position to search from in the standby logfile | ||||
| $logstart = -s $node_standby->logfile; | ||||
|  | ||||
| reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_', 'no_conflict_', 0, 1); | ||||
|  | ||||
| # This should not trigger a conflict | ||||
| $node_primary->safe_psql('testdb', qq[ | ||||
|   CREATE TABLE conflict_test(x integer, y text); | ||||
|   INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s; | ||||
|   UPDATE conflict_test set x=1, y=1; | ||||
|   VACUUM conflict_test; | ||||
|   INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal | ||||
| ]); | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # message should not be issued | ||||
| ok( !find_in_log( | ||||
|    $node_standby, | ||||
|   "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart), | ||||
|   'inactiveslot slot invalidation is not logged with vacuum on conflict_test'); | ||||
|  | ||||
| ok( !find_in_log( | ||||
|    $node_standby, | ||||
|   "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart), | ||||
|   'activeslot slot invalidation is not logged with vacuum on conflict_test'); | ||||
|  | ||||
| # Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated | ||||
| ok( $node_standby->poll_query_until( | ||||
| 	'postgres', | ||||
| 	"select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'", 't'), | ||||
| 	'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated"; | ||||
|  | ||||
| # Verify slots are reported as non conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(0); | ||||
|  | ||||
| # Turn hot_standby_feedback back on | ||||
| change_hot_standby_feedback_and_wait_for_xmins(1, 0); | ||||
|  | ||||
| # Restart the standby node to ensure no slots are still active | ||||
| $node_standby->restart; | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Invalidate conflicting slots, including in-use slots | ||||
| # Scenario 4: conflict due to on-access pruning. | ||||
| ################################################## | ||||
|  | ||||
| # get the position to search from in the standby logfile | ||||
| $logstart = -s $node_standby->logfile; | ||||
|  | ||||
| # One way to produce recovery conflict is to trigger an on-access pruning | ||||
| # on a relation marked as user_catalog_table. | ||||
| reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0, 0); | ||||
|  | ||||
| # This should trigger the conflict | ||||
| $node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]); | ||||
| $node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]); | ||||
| $node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]); | ||||
| $node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]); | ||||
| $node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]); | ||||
| $node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| check_for_invalidation('pruning_', $logstart, 'with on-access pruning'); | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr); | ||||
|  | ||||
| # We are not able to read from the slot as it has been invalidated | ||||
| check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"pruning_activeslot\""); | ||||
|  | ||||
| # Turn hot_standby_feedback back on | ||||
| change_hot_standby_feedback_and_wait_for_xmins(1, 1); | ||||
|  | ||||
| ################################################## | ||||
| # Recovery conflict: Invalidate conflicting slots, including in-use slots | ||||
| # Scenario 5: incorrect wal_level on primary. | ||||
| ################################################## | ||||
|  | ||||
| # get the position to search from in the standby logfile | ||||
| $logstart = -s $node_standby->logfile; | ||||
|  | ||||
| # drop the logical slots | ||||
| drop_logical_slots('pruning_'); | ||||
|  | ||||
| # create the logical slots | ||||
| create_logical_slots($node_standby, 'wal_level_'); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr); | ||||
|  | ||||
| # reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts | ||||
| $node_standby->psql('testdb', q[select pg_stat_reset();]); | ||||
|  | ||||
| # Make primary wal_level replica. This will trigger slot conflict. | ||||
| $node_primary->append_conf('postgresql.conf',q[ | ||||
| wal_level = 'replica' | ||||
| ]); | ||||
| $node_primary->restart; | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| # Check invalidation in the logfile and in pg_stat_database_conflicts | ||||
| check_for_invalidation('wal_level_', $logstart, 'due to wal_level'); | ||||
|  | ||||
| # Verify slots are reported as conflicting in pg_replication_slots | ||||
| check_slots_conflicting_status(1); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); | ||||
| # We are not able to read from the slot as it requires wal_level at least logical on the primary server | ||||
| check_pg_recvlogical_stderr($handle, "logical decoding on a standby requires wal_level to be at least logical on the primary"); | ||||
|  | ||||
| # Restore primary wal_level | ||||
| $node_primary->append_conf('postgresql.conf',q[ | ||||
| wal_level = 'logical' | ||||
| ]); | ||||
| $node_primary->restart; | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); | ||||
| # as the slot has been invalidated we should not be able to read | ||||
| check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"wal_level_activeslot\""); | ||||
|  | ||||
| ################################################## | ||||
| # DROP DATABASE should drops it's slots, including active slots. | ||||
| ################################################## | ||||
|  | ||||
| # drop the logical slots | ||||
| drop_logical_slots('wal_level_'); | ||||
|  | ||||
| # create the logical slots | ||||
| create_logical_slots($node_standby, 'drop_db_'); | ||||
|  | ||||
| $handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr); | ||||
|  | ||||
| # Create a slot on a database that would not be dropped. This slot should not | ||||
| # get dropped. | ||||
| $node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres'); | ||||
|  | ||||
| # dropdb on the primary to verify slots are dropped on standby | ||||
| $node_primary->safe_psql('postgres', q[DROP DATABASE testdb]); | ||||
|  | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
|  | ||||
| is($node_standby->safe_psql('postgres', | ||||
| 	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', | ||||
| 	'database dropped on standby'); | ||||
|  | ||||
| check_slots_dropped('drop_db', $handle); | ||||
|  | ||||
| is($node_standby->slot('otherslot')->{'slot_type'}, 'logical', | ||||
| 	'otherslot on standby not dropped'); | ||||
|  | ||||
| # Cleanup : manually drop the slot that was not dropped. | ||||
| $node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); | ||||
|  | ||||
| ################################################## | ||||
| # Test standby promotion and logical decoding behavior | ||||
| # after the standby gets promoted. | ||||
| ################################################## | ||||
|  | ||||
| $node_standby->reload; | ||||
|  | ||||
| $node_primary->psql('postgres', q[CREATE DATABASE testdb]); | ||||
| $node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); | ||||
|  | ||||
| # create the logical slots | ||||
| create_logical_slots($node_standby, 'promotion_'); | ||||
|  | ||||
| # create the logical slots on the cascading standby too | ||||
| create_logical_slots($node_cascading_standby, 'promotion_'); | ||||
|  | ||||
| # Make slots actives | ||||
| $handle = make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr); | ||||
| my $cascading_handle = make_slot_active($node_cascading_standby, 'promotion_', 1, \$cascading_stdout, \$cascading_stderr); | ||||
|  | ||||
| # Insert some rows before the promotion | ||||
| $node_primary->safe_psql('testdb', | ||||
| 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] | ||||
| ); | ||||
|  | ||||
| # Wait for both standbys to catchup | ||||
| $node_primary->wait_for_replay_catchup($node_standby); | ||||
| $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); | ||||
|  | ||||
| # promote | ||||
| $node_standby->promote; | ||||
|  | ||||
| # insert some rows on promoted standby | ||||
| $node_standby->safe_psql('testdb', | ||||
| 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;] | ||||
| ); | ||||
|  | ||||
| # Wait for the cascading standby to catchup | ||||
| $node_standby->wait_for_replay_catchup($node_cascading_standby); | ||||
|  | ||||
| $expected = q{BEGIN | ||||
| table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' | ||||
| table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' | ||||
| table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' | ||||
| table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' | ||||
| COMMIT | ||||
| BEGIN | ||||
| table public.decoding_test: INSERT: x[integer]:5 y[text]:'5' | ||||
| table public.decoding_test: INSERT: x[integer]:6 y[text]:'6' | ||||
| table public.decoding_test: INSERT: x[integer]:7 y[text]:'7' | ||||
| COMMIT}; | ||||
|  | ||||
| # check that we are decoding pre and post promotion inserted rows | ||||
| $stdout_sql = $node_standby->safe_psql('testdb', | ||||
| 	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] | ||||
| ); | ||||
|  | ||||
| is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby'); | ||||
|  | ||||
| # check that we are decoding pre and post promotion inserted rows | ||||
| # with pg_recvlogical that has started before the promotion | ||||
| my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); | ||||
|  | ||||
| ok( pump_until( | ||||
|         $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s), | ||||
|     'got 2 COMMIT from pg_recvlogical output'); | ||||
|  | ||||
| chomp($stdout); | ||||
| is($stdout, $expected, | ||||
|     'got same expected output from pg_recvlogical decoding session'); | ||||
|  | ||||
| # check that we are decoding pre and post promotion inserted rows on the cascading standby | ||||
| $stdout_sql = $node_cascading_standby->safe_psql('testdb', | ||||
| 	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] | ||||
| ); | ||||
|  | ||||
| is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby'); | ||||
|  | ||||
| # check that we are decoding pre and post promotion inserted rows | ||||
| # with pg_recvlogical that has started before the promotion on the cascading standby | ||||
| ok( pump_until( | ||||
|         $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s), | ||||
|     'got 2 COMMIT from pg_recvlogical output'); | ||||
|  | ||||
| chomp($cascading_stdout); | ||||
| is($cascading_stdout, $expected, | ||||
|     'got same expected output from pg_recvlogical decoding session on cascading standby'); | ||||
|  | ||||
| done_testing(); | ||||
		Reference in New Issue
	
	Block a user