mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Refactor background psql TAP functions
This breaks out the background and interactive psql functionality into a
new class, PostgreSQL::Test::BackgroundPsql.  Sessions are still initiated
via PostgreSQL::Test::Cluster, but once started they can be manipulated by
the new helper functions which intend to make querying easier.  A sample
session for a command which can be expected to finish at a later time can
be seen below.
  my $session = $node->background_psql('postgres');
  $bsession->query_until(qr/start/, q(
    \echo start
	CREATE INDEX CONCURRENTLY idx ON t(a);
  ));
  $bsession->quit;
Patch by Andres Freund with some additional hacking by me.
Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Andrew Dunstan <andrew@dunslane.net>
Discussion: https://postgr.es/m/20230130194350.zj5v467x4jgqt3d6@awork3.anarazel.de
			
			
This commit is contained in:
		| @@ -36,63 +36,46 @@ $node->safe_psql('postgres', q(CREATE TABLE tbl(i int))); | |||||||
| # statements. | # statements. | ||||||
| # | # | ||||||
|  |  | ||||||
| my $main_in    = ''; | my $main_h = $node->background_psql('postgres'); | ||||||
| my $main_out   = ''; |  | ||||||
| my $main_timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); |  | ||||||
|  |  | ||||||
| my $main_h = | $main_h->query_safe(q( | ||||||
|   $node->background_psql('postgres', \$main_in, \$main_out, |  | ||||||
| 	$main_timer, on_error_stop => 1); |  | ||||||
| $main_in .= q( |  | ||||||
| BEGIN; | BEGIN; | ||||||
| INSERT INTO tbl VALUES(0); | INSERT INTO tbl VALUES(0); | ||||||
| \echo syncpoint1 | )); | ||||||
| ); |  | ||||||
| pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired; |  | ||||||
|  |  | ||||||
| my $cic_in    = ''; | my $cic_h = $node->background_psql('postgres'); | ||||||
| my $cic_out   = ''; |  | ||||||
| my $cic_timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); | $cic_h->query_until(qr/start/, q( | ||||||
| my $cic_h = |  | ||||||
|   $node->background_psql('postgres', \$cic_in, \$cic_out, |  | ||||||
| 	$cic_timer, on_error_stop => 1); |  | ||||||
| $cic_in .= q( |  | ||||||
| \echo start | \echo start | ||||||
| CREATE INDEX CONCURRENTLY idx ON tbl(i); | CREATE INDEX CONCURRENTLY idx ON tbl(i); | ||||||
| ); | )); | ||||||
| pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired; |  | ||||||
|  |  | ||||||
| $main_in .= q( | $main_h->query_safe(q( | ||||||
| PREPARE TRANSACTION 'a'; | PREPARE TRANSACTION 'a'; | ||||||
| ); | )); | ||||||
|  |  | ||||||
| $main_in .= q( | $main_h->query_safe(q( | ||||||
| BEGIN; | BEGIN; | ||||||
| INSERT INTO tbl VALUES(0); | INSERT INTO tbl VALUES(0); | ||||||
| \echo syncpoint2 | )); | ||||||
| ); |  | ||||||
| pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired; |  | ||||||
|  |  | ||||||
| $node->safe_psql('postgres', q(COMMIT PREPARED 'a';)); | $node->safe_psql('postgres', q(COMMIT PREPARED 'a';)); | ||||||
|  |  | ||||||
| $main_in .= q( | $main_h->query_safe(q( | ||||||
| PREPARE TRANSACTION 'b'; | PREPARE TRANSACTION 'b'; | ||||||
| BEGIN; | BEGIN; | ||||||
| INSERT INTO tbl VALUES(0); | INSERT INTO tbl VALUES(0); | ||||||
| \echo syncpoint3 | )); | ||||||
| ); |  | ||||||
| pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired; |  | ||||||
|  |  | ||||||
| $node->safe_psql('postgres', q(COMMIT PREPARED 'b';)); | $node->safe_psql('postgres', q(COMMIT PREPARED 'b';)); | ||||||
|  |  | ||||||
| $main_in .= q( | $main_h->query_safe(q( | ||||||
| PREPARE TRANSACTION 'c'; | PREPARE TRANSACTION 'c'; | ||||||
| COMMIT PREPARED 'c'; | COMMIT PREPARED 'c'; | ||||||
| ); | )); | ||||||
| $main_h->pump_nb; |  | ||||||
|  |  | ||||||
| $main_h->finish; | $main_h->quit; | ||||||
| $cic_h->finish; | $cic_h->quit; | ||||||
|  |  | ||||||
| $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); | $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); | ||||||
| is($result, '0', 'bt_index_check after overlapping 2PC'); | is($result, '0', 'bt_index_check after overlapping 2PC'); | ||||||
| @@ -113,22 +96,15 @@ PREPARE TRANSACTION 'persists_forever'; | |||||||
| )); | )); | ||||||
| $node->restart; | $node->restart; | ||||||
|  |  | ||||||
| my $reindex_in  = ''; | my $reindex_h = $node->background_psql('postgres'); | ||||||
| my $reindex_out = ''; | $reindex_h->query_until(qr/start/, q( | ||||||
| my $reindex_timer = |  | ||||||
|   IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); |  | ||||||
| my $reindex_h = |  | ||||||
|   $node->background_psql('postgres', \$reindex_in, \$reindex_out, |  | ||||||
| 	$reindex_timer, on_error_stop => 1); |  | ||||||
| $reindex_in .= q( |  | ||||||
| \echo start | \echo start | ||||||
| DROP INDEX CONCURRENTLY idx; | DROP INDEX CONCURRENTLY idx; | ||||||
| CREATE INDEX CONCURRENTLY idx ON tbl(i); | CREATE INDEX CONCURRENTLY idx ON tbl(i); | ||||||
| ); | )); | ||||||
| pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired; |  | ||||||
|  |  | ||||||
| $node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'"); | $node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'"); | ||||||
| $reindex_h->finish; | $reindex_h->quit; | ||||||
| $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); | $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); | ||||||
| is($result, '0', 'bt_index_check after 2PC and restart'); | is($result, '0', 'bt_index_check after 2PC and restart'); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -7,7 +7,6 @@ use warnings; | |||||||
| use PostgreSQL::Test::Cluster; | use PostgreSQL::Test::Cluster; | ||||||
| use PostgreSQL::Test::Utils; | use PostgreSQL::Test::Utils; | ||||||
| use Test::More; | use Test::More; | ||||||
| use IPC::Run qw(pump finish timer); |  | ||||||
| use Data::Dumper; | use Data::Dumper; | ||||||
|  |  | ||||||
| # Do nothing unless Makefile has told us that the build is --with-readline. | # Do nothing unless Makefile has told us that the build is --with-readline. | ||||||
| @@ -92,14 +91,7 @@ print $FH "other stuff\n"; | |||||||
| close $FH; | close $FH; | ||||||
|  |  | ||||||
| # fire up an interactive psql session | # fire up an interactive psql session | ||||||
| my $in  = ''; | my $h = $node->interactive_psql('postgres'); | ||||||
| my $out = ''; |  | ||||||
|  |  | ||||||
| my $timer = timer($PostgreSQL::Test::Utils::timeout_default); |  | ||||||
|  |  | ||||||
| my $h = $node->interactive_psql('postgres', \$in, \$out, $timer); |  | ||||||
|  |  | ||||||
| like($out, qr/psql/, "print startup banner"); |  | ||||||
|  |  | ||||||
| # Simple test case: type something and see if psql responds as expected | # Simple test case: type something and see if psql responds as expected | ||||||
| sub check_completion | sub check_completion | ||||||
| @@ -109,15 +101,12 @@ sub check_completion | |||||||
| 	# report test failures from caller location | 	# report test failures from caller location | ||||||
| 	local $Test::Builder::Level = $Test::Builder::Level + 1; | 	local $Test::Builder::Level = $Test::Builder::Level + 1; | ||||||
|  |  | ||||||
| 	# reset output collector |  | ||||||
| 	$out = ""; |  | ||||||
| 	# restart per-command timer | 	# restart per-command timer | ||||||
| 	$timer->start($PostgreSQL::Test::Utils::timeout_default); | 	$h->{timeout}->start($PostgreSQL::Test::Utils::timeout_default); | ||||||
| 	# send the data to be sent |  | ||||||
| 	$in .= $send; | 	# send the data to be sent and wait for its result | ||||||
| 	# wait ... | 	my $out = $h->query_until($pattern, $send); | ||||||
| 	pump $h until ($out =~ $pattern || $timer->is_expired); | 	my $okay = ($out =~ $pattern && !$h->{timeout}->is_expired); | ||||||
| 	my $okay = ($out =~ $pattern && !$timer->is_expired); |  | ||||||
| 	ok($okay, $annotation); | 	ok($okay, $annotation); | ||||||
| 	# for debugging, log actual output if it didn't match | 	# for debugging, log actual output if it didn't match | ||||||
| 	local $Data::Dumper::Terse = 1; | 	local $Data::Dumper::Terse = 1; | ||||||
| @@ -451,10 +440,7 @@ check_completion( | |||||||
| clear_line(); | clear_line(); | ||||||
|  |  | ||||||
| # send psql an explicit \q to shut it down, else pty won't close properly | # send psql an explicit \q to shut it down, else pty won't close properly | ||||||
| $timer->start($PostgreSQL::Test::Utils::timeout_default); | $h->quit or die "psql returned $?"; | ||||||
| $in .= "\\q\n"; |  | ||||||
| finish $h or die "psql returned $?"; |  | ||||||
| $timer->reset; |  | ||||||
|  |  | ||||||
| # done | # done | ||||||
| $node->stop; | $node->stop; | ||||||
|   | |||||||
							
								
								
									
										299
									
								
								src/test/perl/PostgreSQL/Test/BackgroundPsql.pm
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										299
									
								
								src/test/perl/PostgreSQL/Test/BackgroundPsql.pm
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,299 @@ | |||||||
|  |  | ||||||
|  | # Copyright (c) 2021-2023, PostgreSQL Global Development Group | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =head1 NAME | ||||||
|  |  | ||||||
|  | PostgreSQL::Test::BackgroundPsql - class for controlling background psql processes | ||||||
|  |  | ||||||
|  | =head1 SYNOPSIS | ||||||
|  |  | ||||||
|  |   use PostgreSQL::Test::Cluster; | ||||||
|  |  | ||||||
|  |   my $node = PostgreSQL::Test::Cluster->new('mynode'); | ||||||
|  |  | ||||||
|  |   # Create a data directory with initdb | ||||||
|  |   $node->init(); | ||||||
|  |  | ||||||
|  |   # Start the PostgreSQL server | ||||||
|  |   $node->start(); | ||||||
|  |  | ||||||
|  |   # Create and start an interactive psql session | ||||||
|  |   my $isession = $node->interactive_psql('postgres'); | ||||||
|  |   # Apply timeout per query rather than per session | ||||||
|  |   $isession->set_query_timer_restart(); | ||||||
|  |   # Run a query and get the output as seen by psql | ||||||
|  |   my $ret = $isession->query("SELECT 1"); | ||||||
|  |   # Run a backslash command and wait until the prompt returns | ||||||
|  |   $isession->query_until(qr/postgres #/, "\\d foo\n"); | ||||||
|  |   # Close the session and exit psql | ||||||
|  |   $isession->quit; | ||||||
|  |  | ||||||
|  |   # Create and start a background psql session | ||||||
|  |   my $bsession = $node->background_psql('postgres'); | ||||||
|  |  | ||||||
|  |   # Run a query which is guaranteed to not return in case it fails | ||||||
|  |   $bsession->query_safe("SELECT 1"); | ||||||
|  |   # Initiate a command which can be expected to terminate at a later stage | ||||||
|  |   $bsession->query_until(qr/start/, q( | ||||||
|  |     \echo start | ||||||
|  | 	CREATE INDEX CONCURRENTLY idx ON t(a); | ||||||
|  |   )); | ||||||
|  |   # Close the session and exit psql | ||||||
|  |   $bsession->quit; | ||||||
|  |  | ||||||
|  | =head1 DESCRIPTION | ||||||
|  |  | ||||||
|  | PostgreSQL::Test::BackgroundPsql contains functionality for controlling | ||||||
|  | a background or interactive psql session operating on a PostgreSQL node | ||||||
|  | initiated by PostgreSQL::Test::Cluster. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | package PostgreSQL::Test::BackgroundPsql; | ||||||
|  |  | ||||||
|  | use strict; | ||||||
|  | use warnings; | ||||||
|  |  | ||||||
|  | use Carp; | ||||||
|  | use Config; | ||||||
|  | use IPC::Run; | ||||||
|  | use PostgreSQL::Test::Utils qw(pump_until); | ||||||
|  | use Test::More; | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =head1 METHODS | ||||||
|  |  | ||||||
|  | =over | ||||||
|  |  | ||||||
|  | =item PostgreSQL::Test::BackroundPsql->new(interactive, @params) | ||||||
|  |  | ||||||
|  | Builds a new object of class C<PostgreSQL::Test::BackgroundPsql> for either | ||||||
|  | an interactive or background session and starts it. If C<interactive> is | ||||||
|  | true then a PTY will be attached. C<psql_params> should contain the full | ||||||
|  | command to run psql with all desired parameters and a complete connection | ||||||
|  | string. For C<interactive> sessions, IO::Pty is required. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub new | ||||||
|  | { | ||||||
|  | 	my $class = shift; | ||||||
|  | 	my ($interactive, $psql_params) = @_; | ||||||
|  | 	my $psql = {'stdin' => '', 'stdout' => '', 'stderr' => '', 'query_timer_restart' => undef}; | ||||||
|  | 	my $run; | ||||||
|  |  | ||||||
|  | 	# This constructor should only be called from PostgreSQL::Test::Cluster | ||||||
|  |     my ($package, $file, $line) = caller; | ||||||
|  |     die "Forbidden caller of constructor: package: $package, file: $file:$line" | ||||||
|  | 	  unless $package->isa('PostgreSQL::Test::Cluster'); | ||||||
|  |  | ||||||
|  | 	$psql->{timeout} = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); | ||||||
|  |  | ||||||
|  | 	if ($interactive) | ||||||
|  | 	{ | ||||||
|  | 		$run = IPC::Run::start $psql_params, | ||||||
|  | 		  '<pty<', \$psql->{stdin}, '>pty>', \$psql->{stdout}, '2>', \$psql->{stderr}, | ||||||
|  | 		  $psql->{timeout}; | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		$run = IPC::Run::start $psql_params, | ||||||
|  | 		  '<', \$psql->{stdin}, '>', \$psql->{stdout}, '2>', \$psql->{stderr}, | ||||||
|  | 		  $psql->{timeout}; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	$psql->{run} = $run; | ||||||
|  |  | ||||||
|  | 	my $self = bless $psql, $class; | ||||||
|  |  | ||||||
|  | 	$self->_wait_connect(); | ||||||
|  |  | ||||||
|  | 	return $self; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | # Internal routine for awaiting psql starting up and being ready to consume | ||||||
|  | # input. | ||||||
|  | sub _wait_connect | ||||||
|  | { | ||||||
|  | 	my ($self) = @_; | ||||||
|  |  | ||||||
|  | 	# Request some output, and pump until we see it.  This means that psql | ||||||
|  | 	# connection failures are caught here, relieving callers of the need to | ||||||
|  | 	# handle those.  (Right now, we have no particularly good handling for | ||||||
|  | 	# errors anyway, but that might be added later.) | ||||||
|  | 	my $banner = "background_psql: ready"; | ||||||
|  | 	$self->{stdin} .= "\\echo $banner\n"; | ||||||
|  | 	$self->{run}->pump() until $self->{stdout} =~ /$banner/ || $self->{timeout}->is_expired; | ||||||
|  | 	$self->{stdout} = ''; # clear out banner | ||||||
|  |  | ||||||
|  | 	die "psql startup timed out" if $self->{timeout}->is_expired; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->quit | ||||||
|  |  | ||||||
|  | Close the session and clean up resources. Each test run must be closed with | ||||||
|  | C<quit>. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub quit | ||||||
|  | { | ||||||
|  | 	my ($self) = @_; | ||||||
|  |  | ||||||
|  | 	$self->{stdin} .= "\\q\n"; | ||||||
|  |  | ||||||
|  | 	return $self->{run}->finish; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->reconnect_and_clear | ||||||
|  |  | ||||||
|  | Terminate the current session and connect again. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub reconnect_and_clear | ||||||
|  | { | ||||||
|  | 	my ($self) = @_; | ||||||
|  |  | ||||||
|  | 	# If psql isn't dead already, tell it to quit as \q, when already dead, | ||||||
|  | 	# causes IPC::Run to unhelpfully error out with "ack Broken pipe:". | ||||||
|  | 	$self->{run}->pump_nb(); | ||||||
|  | 	if ($self->{run}->pumpable()) | ||||||
|  | 	{ | ||||||
|  | 		$self->{stdin} .= "\\q\n"; | ||||||
|  | 	} | ||||||
|  | 	$self->{run}->finish; | ||||||
|  |  | ||||||
|  | 	# restart | ||||||
|  | 	$self->{run}->run(); | ||||||
|  | 	$self->{stdin}  = ''; | ||||||
|  | 	$self->{stdout} = ''; | ||||||
|  |  | ||||||
|  | 	$self->_wait_connect() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->query() | ||||||
|  |  | ||||||
|  | Executes a query in the current session and returns the output in scalar | ||||||
|  | context and (output, error) in list context where error is 1 in case there | ||||||
|  | was output generated on stderr when executing the query. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub query | ||||||
|  | { | ||||||
|  | 	my ($self, $query) = @_; | ||||||
|  | 	my $ret; | ||||||
|  | 	my $output; | ||||||
|  | 	local $Test::Builder::Level = $Test::Builder::Level + 1; | ||||||
|  |  | ||||||
|  | 	note "issuing query via background psql: $query"; | ||||||
|  |  | ||||||
|  | 	$self->{timeout}->start() if (defined($self->{query_timer_restart})); | ||||||
|  |  | ||||||
|  | 	# Feed the query to psql's stdin, followed by \n (so psql processes the | ||||||
|  | 	# line), by a ; (so that psql issues the query, if it doesnt't include a ; | ||||||
|  | 	# itself), and a separator echoed with \echo, that we can wait on. | ||||||
|  | 	my $banner = "background_psql: QUERY_SEPARATOR"; | ||||||
|  | 	$self->{stdin} .= "$query\n;\n\\echo $banner\n"; | ||||||
|  |  | ||||||
|  | 	pump_until($self->{run}, $self->{timeout}, \$self->{stdout}, qr/$banner/); | ||||||
|  |  | ||||||
|  | 	die "psql query timed out" if $self->{timeout}->is_expired; | ||||||
|  | 	$output = $self->{stdout}; | ||||||
|  |  | ||||||
|  | 	# remove banner again, our caller doesn't care | ||||||
|  | 	$output =~ s/\n$banner$//s; | ||||||
|  |  | ||||||
|  | 	# clear out output for the next query | ||||||
|  | 	$self->{stdout} = ''; | ||||||
|  |  | ||||||
|  | 	$ret = $self->{stderr} eq "" ? 0 : 1; | ||||||
|  |  | ||||||
|  | 	return wantarray ? ( $output, $ret ) : $output; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->query_safe() | ||||||
|  |  | ||||||
|  | Wrapper around C<query> which errors out if the query failed to execute. | ||||||
|  | Query failure is determined by it producing output on stderr. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub query_safe | ||||||
|  | { | ||||||
|  | 	my ($self, $query) = @_; | ||||||
|  |  | ||||||
|  | 	my $ret = $self->query($query); | ||||||
|  |  | ||||||
|  | 	if ($self->{stderr} ne "") | ||||||
|  | 	{ | ||||||
|  | 		die "query failed: $self->{stderr}"; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return $ret; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->query_until(until, query) | ||||||
|  |  | ||||||
|  | Issue C<query> and wait for C<until> appearing in the query output rather than | ||||||
|  | waiting for query completion. C<query> needs to end with newline and semicolon | ||||||
|  | (if applicable, interactive psql input may not require it) for psql to process | ||||||
|  | the input. | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub query_until | ||||||
|  | { | ||||||
|  | 	my ($self, $until, $query) = @_; | ||||||
|  | 	my $ret; | ||||||
|  | 	local $Test::Builder::Level = $Test::Builder::Level + 1; | ||||||
|  |  | ||||||
|  | 	$self->{timeout}->start() if (defined($self->{query_timer_restart})); | ||||||
|  | 	$self->{stdin} .= $query; | ||||||
|  |  | ||||||
|  | 	pump_until($self->{run}, $self->{timeout}, \$self->{stdout}, $until); | ||||||
|  |  | ||||||
|  | 	die "psql query timed out" if $self->{timeout}->is_expired; | ||||||
|  |  | ||||||
|  | 	$ret = $self->{stdout}; | ||||||
|  |  | ||||||
|  | 	# clear out output for the next query | ||||||
|  | 	$self->{stdout} = ''; | ||||||
|  |  | ||||||
|  | 	return $ret; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | =pod | ||||||
|  |  | ||||||
|  | =item $session->set_query_timer_restart() | ||||||
|  |  | ||||||
|  | Configures the timer to be restarted before each query such that the defined | ||||||
|  | timeout is valid per query rather than per test run. | ||||||
|  |  | ||||||
|  | =back | ||||||
|  |  | ||||||
|  | =cut | ||||||
|  |  | ||||||
|  | sub set_query_timer_restart | ||||||
|  | { | ||||||
|  | 	my $self = shift; | ||||||
|  |  | ||||||
|  | 	$self->{query_timer_restart} = shift if @_; | ||||||
|  | 	return $self->{query_timer_restart}; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | 1; | ||||||
| @@ -113,6 +113,7 @@ use PostgreSQL::Test::RecursiveCopy; | |||||||
| use Socket; | use Socket; | ||||||
| use Test::More; | use Test::More; | ||||||
| use PostgreSQL::Test::Utils (); | use PostgreSQL::Test::Utils (); | ||||||
|  | use PostgreSQL::Test::BackgroundPsql (); | ||||||
| use Time::HiRes qw(usleep); | use Time::HiRes qw(usleep); | ||||||
| use Scalar::Util qw(blessed); | use Scalar::Util qw(blessed); | ||||||
|  |  | ||||||
| @@ -1966,18 +1967,12 @@ sub psql | |||||||
|  |  | ||||||
| =pod | =pod | ||||||
|  |  | ||||||
| =item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness | =item $node->background_psql($dbname, %params) => PostgreSQL::Test::BackgroundPsql instance | ||||||
|  |  | ||||||
| Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, which the | Invoke B<psql> on B<$dbname> and return a BackgroundPsql object. | ||||||
| caller may use to send input to B<psql>.  The process's stdin is sourced from |  | ||||||
| the $stdin scalar reference, and its stdout and stderr go to the $stdout |  | ||||||
| scalar reference.  This allows the caller to act on other parts of the system |  | ||||||
| while idling this backend. |  | ||||||
|  |  | ||||||
| The specified timer object is attached to the harness, as well.  It's caller's | A default timeout of $PostgreSQL::Test::Utils::timeout_default is set up, | ||||||
| responsibility to set the timeout length (usually | which can be modified later. | ||||||
| $PostgreSQL::Test::Utils::timeout_default), and to restart the timer after |  | ||||||
| each command if the timeout is per-command. |  | ||||||
|  |  | ||||||
| psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> | psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> | ||||||
| disabled.  That may be overridden by passing extra psql parameters. | disabled.  That may be overridden by passing extra psql parameters. | ||||||
| @@ -1986,7 +1981,7 @@ Dies on failure to invoke psql, or if psql fails to connect.  Errors occurring | |||||||
| later are the caller's problem.  psql runs with on_error_stop by default so | later are the caller's problem.  psql runs with on_error_stop by default so | ||||||
| that it will stop running sql and return 3 if passed SQL results in an error. | that it will stop running sql and return 3 if passed SQL results in an error. | ||||||
|  |  | ||||||
| Be sure to "finish" the harness when done with it. | Be sure to "quit" the returned object when done with it. | ||||||
|  |  | ||||||
| =over | =over | ||||||
|  |  | ||||||
| @@ -2012,7 +2007,7 @@ If given, it must be an array reference containing additional parameters to B<ps | |||||||
|  |  | ||||||
| sub background_psql | sub background_psql | ||||||
| { | { | ||||||
| 	my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_; | 	my ($self, $dbname, %params) = @_; | ||||||
|  |  | ||||||
| 	local %ENV = $self->_get_env(); | 	local %ENV = $self->_get_env(); | ||||||
|  |  | ||||||
| @@ -2033,41 +2028,18 @@ sub background_psql | |||||||
| 	push @psql_params, @{ $params{extra_params} } | 	push @psql_params, @{ $params{extra_params} } | ||||||
| 	  if defined $params{extra_params}; | 	  if defined $params{extra_params}; | ||||||
|  |  | ||||||
| 	# Ensure there is no data waiting to be sent: | 	return PostgreSQL::Test::BackgroundPsql->new(0, \@psql_params); | ||||||
| 	$$stdin = "" if ref($stdin); |  | ||||||
| 	# IPC::Run would otherwise append to existing contents: |  | ||||||
| 	$$stdout = "" if ref($stdout); |  | ||||||
|  |  | ||||||
| 	my $harness = IPC::Run::start \@psql_params, |  | ||||||
| 	  '<', $stdin, '>', $stdout, $timer; |  | ||||||
|  |  | ||||||
| 	# Request some output, and pump until we see it.  This means that psql |  | ||||||
| 	# connection failures are caught here, relieving callers of the need to |  | ||||||
| 	# handle those.  (Right now, we have no particularly good handling for |  | ||||||
| 	# errors anyway, but that might be added later.) |  | ||||||
| 	my $banner = "background_psql: ready"; |  | ||||||
| 	$$stdin = "\\echo $banner\n"; |  | ||||||
| 	pump $harness until $$stdout =~ /$banner/ || $timer->is_expired; |  | ||||||
|  |  | ||||||
| 	die "psql startup timed out" if $timer->is_expired; |  | ||||||
|  |  | ||||||
| 	return $harness; |  | ||||||
| } | } | ||||||
|  |  | ||||||
| =pod | =pod | ||||||
|  |  | ||||||
| =item $node->interactive_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness | =item $node->interactive_psql($dbname, %params) => BackgroundPsql instance | ||||||
|  |  | ||||||
| Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, | Invoke B<psql> on B<$dbname> and return a BackgroundPsql object, which the | ||||||
| which the caller may use to send interactive input to B<psql>. | caller may use to send interactive input to B<psql>. | ||||||
| The process's stdin is sourced from the $stdin scalar reference, |  | ||||||
| and its stdout and stderr go to the $stdout scalar reference. |  | ||||||
| ptys are used so that psql thinks it's being called interactively. |  | ||||||
|  |  | ||||||
| The specified timer object is attached to the harness, as well.  It's caller's | A default timeout of $PostgreSQL::Test::Utils::timeout_default is set up, | ||||||
| responsibility to set the timeout length (usually | which can be modified later. | ||||||
| $PostgreSQL::Test::Utils::timeout_default), and to restart the timer after |  | ||||||
| each command if the timeout is per-command. |  | ||||||
|  |  | ||||||
| psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> | psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> | ||||||
| disabled.  That may be overridden by passing extra psql parameters. | disabled.  That may be overridden by passing extra psql parameters. | ||||||
| @@ -2075,7 +2047,7 @@ disabled.  That may be overridden by passing extra psql parameters. | |||||||
| Dies on failure to invoke psql, or if psql fails to connect. | Dies on failure to invoke psql, or if psql fails to connect. | ||||||
| Errors occurring later are the caller's problem. | Errors occurring later are the caller's problem. | ||||||
|  |  | ||||||
| Be sure to "finish" the harness when done with it. | Be sure to "quit" the returned object when done with it. | ||||||
|  |  | ||||||
| The only extra parameter currently accepted is | The only extra parameter currently accepted is | ||||||
|  |  | ||||||
| @@ -2093,7 +2065,7 @@ This requires IO::Pty in addition to IPC::Run. | |||||||
|  |  | ||||||
| sub interactive_psql | sub interactive_psql | ||||||
| { | { | ||||||
| 	my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_; | 	my ($self, $dbname, %params) = @_; | ||||||
|  |  | ||||||
| 	local %ENV = $self->_get_env(); | 	local %ENV = $self->_get_env(); | ||||||
|  |  | ||||||
| @@ -2104,26 +2076,7 @@ sub interactive_psql | |||||||
| 	push @psql_params, @{ $params{extra_params} } | 	push @psql_params, @{ $params{extra_params} } | ||||||
| 	  if defined $params{extra_params}; | 	  if defined $params{extra_params}; | ||||||
|  |  | ||||||
| 	# Ensure there is no data waiting to be sent: | 	return PostgreSQL::Test::BackgroundPsql->new(1, \@psql_params); | ||||||
| 	$$stdin = "" if ref($stdin); |  | ||||||
| 	# IPC::Run would otherwise append to existing contents: |  | ||||||
| 	$$stdout = "" if ref($stdout); |  | ||||||
|  |  | ||||||
| 	my $harness = IPC::Run::start \@psql_params, |  | ||||||
| 	  '<pty<', $stdin, '>pty>', $stdout, $timer; |  | ||||||
|  |  | ||||||
| 	# Pump until we see psql's help banner.  This ensures that callers |  | ||||||
| 	# won't write anything to the pty before it's ready, avoiding an |  | ||||||
| 	# implementation issue in IPC::Run.  Also, it means that psql |  | ||||||
| 	# connection failures are caught here, relieving callers of |  | ||||||
| 	# the need to handle those.  (Right now, we have no particularly |  | ||||||
| 	# good handling for errors anyway, but that might be added later.) |  | ||||||
| 	pump $harness |  | ||||||
| 	  until $$stdout =~ /Type "help" for help/ || $timer->is_expired; |  | ||||||
|  |  | ||||||
| 	die "psql startup timed out" if $timer->is_expired; |  | ||||||
|  |  | ||||||
| 	return $harness; |  | ||||||
| } | } | ||||||
|  |  | ||||||
| # Common sub of pgbench-invoking interfaces.  Makes any requested script files | # Common sub of pgbench-invoking interfaces.  Makes any requested script files | ||||||
|   | |||||||
| @@ -28,7 +28,6 @@ use PostgreSQL::Test::Cluster; | |||||||
| use PostgreSQL::Test::Utils; | use PostgreSQL::Test::Utils; | ||||||
| use Test::More; | use Test::More; | ||||||
| use File::Copy; | use File::Copy; | ||||||
| use IPC::Run (); |  | ||||||
| use Scalar::Util qw(blessed); | use Scalar::Util qw(blessed); | ||||||
|  |  | ||||||
| my ($stdout, $stderr, $ret); | my ($stdout, $stderr, $ret); | ||||||
|   | |||||||
| @@ -67,14 +67,8 @@ $node_primary->wait_for_replay_catchup($node_standby); | |||||||
|  |  | ||||||
|  |  | ||||||
| # a longrunning psql that we can use to trigger conflicts | # a longrunning psql that we can use to trigger conflicts | ||||||
| my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); | my $psql_standby = $node_standby->background_psql($test_db, | ||||||
| my %psql_standby = ('stdin' => '', 'stdout' => ''); | 	on_error_stop => 0); | ||||||
| $psql_standby{run} = |  | ||||||
|   $node_standby->background_psql($test_db, \$psql_standby{stdin}, |  | ||||||
| 	\$psql_standby{stdout}, |  | ||||||
| 	$psql_timeout); |  | ||||||
| $psql_standby{stdout} = ''; |  | ||||||
|  |  | ||||||
| my $expected_conflicts = 0; | my $expected_conflicts = 0; | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -102,15 +96,14 @@ my $cursor1 = "test_recovery_conflict_cursor"; | |||||||
|  |  | ||||||
| # DECLARE and use a cursor on standby, causing buffer with the only block of | # DECLARE and use a cursor on standby, causing buffer with the only block of | ||||||
| # the relation to be pinned on the standby | # the relation to be pinned on the standby | ||||||
| $psql_standby{stdin} .= qq[ | my $res = $psql_standby->query_safe(qq[ | ||||||
|         BEGIN; |     BEGIN; | ||||||
|         DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; |     DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; | ||||||
|         FETCH FORWARD FROM $cursor1; |     FETCH FORWARD FROM $cursor1; | ||||||
|         ]; | ]); | ||||||
| # FETCH FORWARD should have returned a 0 since all values of b in the table | # FETCH FORWARD should have returned a 0 since all values of b in the table | ||||||
| # are 0 | # are 0 | ||||||
| ok(pump_until_standby(qr/^0$/m), | like($res, qr/^0$/m, "$sect: cursor with conflicting pin established"); | ||||||
| 	"$sect: cursor with conflicting pin established"); |  | ||||||
|  |  | ||||||
| # to check the log starting now for recovery conflict messages | # to check the log starting now for recovery conflict messages | ||||||
| my $log_location = -s $node_standby->logfile; | my $log_location = -s $node_standby->logfile; | ||||||
| @@ -125,7 +118,7 @@ $node_primary->safe_psql($test_db, qq[VACUUM $table1;]); | |||||||
| $node_primary->wait_for_replay_catchup($node_standby); | $node_primary->wait_for_replay_catchup($node_standby); | ||||||
|  |  | ||||||
| check_conflict_log("User was holding shared buffer pin for too long"); | check_conflict_log("User was holding shared buffer pin for too long"); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
| check_conflict_stat("bufferpin"); | check_conflict_stat("bufferpin"); | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -138,15 +131,12 @@ $node_primary->safe_psql($test_db, | |||||||
| $node_primary->wait_for_replay_catchup($node_standby); | $node_primary->wait_for_replay_catchup($node_standby); | ||||||
|  |  | ||||||
| # DECLARE and FETCH from cursor on the standby | # DECLARE and FETCH from cursor on the standby | ||||||
| $psql_standby{stdin} .= qq[ | $res = $psql_standby->query_safe(qq[ | ||||||
|         BEGIN; |         BEGIN; | ||||||
|         DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; |         DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; | ||||||
|         FETCH FORWARD FROM $cursor1; |         FETCH FORWARD FROM $cursor1; | ||||||
|         ]; |         ]); | ||||||
| ok( pump_until( | like($res, qr/^0$/m, "$sect: cursor with conflicting snapshot established"); | ||||||
| 		$psql_standby{run},     $psql_timeout, |  | ||||||
| 		\$psql_standby{stdout}, qr/^0$/m,), |  | ||||||
| 	"$sect: cursor with conflicting snapshot established"); |  | ||||||
|  |  | ||||||
| # Do some HOT updates | # Do some HOT updates | ||||||
| $node_primary->safe_psql($test_db, | $node_primary->safe_psql($test_db, | ||||||
| @@ -160,7 +150,7 @@ $node_primary->wait_for_replay_catchup($node_standby); | |||||||
|  |  | ||||||
| check_conflict_log( | check_conflict_log( | ||||||
| 	"User query might have needed to see row versions that must be removed"); | 	"User query might have needed to see row versions that must be removed"); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
| check_conflict_stat("snapshot"); | check_conflict_stat("snapshot"); | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -169,12 +159,12 @@ $sect = "lock conflict"; | |||||||
| $expected_conflicts++; | $expected_conflicts++; | ||||||
|  |  | ||||||
| # acquire lock to conflict with | # acquire lock to conflict with | ||||||
| $psql_standby{stdin} .= qq[ | $res = $psql_standby->query_safe(qq[ | ||||||
|         BEGIN; |         BEGIN; | ||||||
|         LOCK TABLE $table1 IN ACCESS SHARE MODE; |         LOCK TABLE $table1 IN ACCESS SHARE MODE; | ||||||
|         SELECT 1; |         SELECT 1; | ||||||
|         ]; |         ]); | ||||||
| ok(pump_until_standby(qr/^1$/m), "$sect: conflicting lock acquired"); | like($res, qr/^1$/m, "$sect: conflicting lock acquired"); | ||||||
|  |  | ||||||
| # DROP TABLE containing block which standby has in a pinned buffer | # DROP TABLE containing block which standby has in a pinned buffer | ||||||
| $node_primary->safe_psql($test_db, qq[DROP TABLE $table1;]); | $node_primary->safe_psql($test_db, qq[DROP TABLE $table1;]); | ||||||
| @@ -182,7 +172,7 @@ $node_primary->safe_psql($test_db, qq[DROP TABLE $table1;]); | |||||||
| $node_primary->wait_for_replay_catchup($node_standby); | $node_primary->wait_for_replay_catchup($node_standby); | ||||||
|  |  | ||||||
| check_conflict_log("User was holding a relation lock for too long"); | check_conflict_log("User was holding a relation lock for too long"); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
| check_conflict_stat("lock"); | check_conflict_stat("lock"); | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -193,14 +183,14 @@ $expected_conflicts++; | |||||||
| # DECLARE a cursor for a query which, with sufficiently low work_mem, will | # DECLARE a cursor for a query which, with sufficiently low work_mem, will | ||||||
| # spill tuples into temp files in the temporary tablespace created during | # spill tuples into temp files in the temporary tablespace created during | ||||||
| # setup. | # setup. | ||||||
| $psql_standby{stdin} .= qq[ | $res = $psql_standby->query_safe(qq[ | ||||||
|         BEGIN; |         BEGIN; | ||||||
|         SET work_mem = '64kB'; |         SET work_mem = '64kB'; | ||||||
|         DECLARE $cursor1 CURSOR FOR |         DECLARE $cursor1 CURSOR FOR | ||||||
|           SELECT count(*) FROM generate_series(1,6000); |           SELECT count(*) FROM generate_series(1,6000); | ||||||
|         FETCH FORWARD FROM $cursor1; |         FETCH FORWARD FROM $cursor1; | ||||||
|         ]; |         ]); | ||||||
| ok(pump_until_standby(qr/^6000$/m), | like($res, qr/^6000$/m, | ||||||
| 	"$sect: cursor with conflicting temp file established"); | 	"$sect: cursor with conflicting temp file established"); | ||||||
|  |  | ||||||
| # Drop the tablespace currently containing spill files for the query on the | # Drop the tablespace currently containing spill files for the query on the | ||||||
| @@ -211,7 +201,7 @@ $node_primary->wait_for_replay_catchup($node_standby); | |||||||
|  |  | ||||||
| check_conflict_log( | check_conflict_log( | ||||||
| 	"User was or might have been using tablespace that must be dropped"); | 	"User was or might have been using tablespace that must be dropped"); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
| check_conflict_stat("tablespace"); | check_conflict_stat("tablespace"); | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -227,7 +217,7 @@ $node_standby->adjust_conf( | |||||||
| 	'max_standby_streaming_delay', | 	'max_standby_streaming_delay', | ||||||
| 	"${PostgreSQL::Test::Utils::timeout_default}s"); | 	"${PostgreSQL::Test::Utils::timeout_default}s"); | ||||||
| $node_standby->restart(); | $node_standby->restart(); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
|  |  | ||||||
| # Generate a few dead rows, to later be cleaned up by vacuum. Then acquire a | # Generate a few dead rows, to later be cleaned up by vacuum. Then acquire a | ||||||
| # lock on another relation in a prepared xact, so it's held continuously by | # lock on another relation in a prepared xact, so it's held continuously by | ||||||
| @@ -250,19 +240,15 @@ SELECT txid_current(); | |||||||
|  |  | ||||||
| $node_primary->wait_for_replay_catchup($node_standby); | $node_primary->wait_for_replay_catchup($node_standby); | ||||||
|  |  | ||||||
| $psql_standby{stdin} .= qq[ | $res = $psql_standby->query_until(qr/^1$/m, qq[ | ||||||
|     BEGIN; |     BEGIN; | ||||||
|     -- hold pin |     -- hold pin | ||||||
|     DECLARE $cursor1 CURSOR FOR SELECT a FROM $table1; |     DECLARE $cursor1 CURSOR FOR SELECT a FROM $table1; | ||||||
|     FETCH FORWARD FROM $cursor1; |     FETCH FORWARD FROM $cursor1; | ||||||
|     -- wait for lock held by prepared transaction |     -- wait for lock held by prepared transaction | ||||||
| 	SELECT * FROM $table2; | 	SELECT * FROM $table2; | ||||||
|     ]; |     ]); | ||||||
| ok( pump_until( | ok( 1, "$sect: cursor holding conflicting pin, also waiting for lock, established"); | ||||||
| 		$psql_standby{run},     $psql_timeout, |  | ||||||
| 		\$psql_standby{stdout}, qr/^1$/m,), |  | ||||||
| 	"$sect: cursor holding conflicting pin, also waiting for lock, established" |  | ||||||
| ); |  | ||||||
|  |  | ||||||
| # just to make sure we're waiting for lock already | # just to make sure we're waiting for lock already | ||||||
| ok( $node_standby->poll_query_until( | ok( $node_standby->poll_query_until( | ||||||
| @@ -277,7 +263,7 @@ $node_primary->safe_psql($test_db, qq[VACUUM $table1;]); | |||||||
| $node_primary->wait_for_replay_catchup($node_standby); | $node_primary->wait_for_replay_catchup($node_standby); | ||||||
|  |  | ||||||
| check_conflict_log("User transaction caused buffer deadlock with recovery."); | check_conflict_log("User transaction caused buffer deadlock with recovery."); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
| check_conflict_stat("deadlock"); | check_conflict_stat("deadlock"); | ||||||
|  |  | ||||||
| # clean up for next tests | # clean up for next tests | ||||||
| @@ -285,7 +271,7 @@ $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]); | |||||||
| $node_standby->adjust_conf('postgresql.conf', 'max_standby_streaming_delay', | $node_standby->adjust_conf('postgresql.conf', 'max_standby_streaming_delay', | ||||||
| 	'50ms'); | 	'50ms'); | ||||||
| $node_standby->restart(); | $node_standby->restart(); | ||||||
| reconnect_and_clear(); | $psql_standby->reconnect_and_clear(); | ||||||
|  |  | ||||||
|  |  | ||||||
| # Check that expected number of conflicts show in pg_stat_database. Needs to | # Check that expected number of conflicts show in pg_stat_database. Needs to | ||||||
| @@ -309,8 +295,7 @@ check_conflict_log("User was connected to a database that must be dropped"); | |||||||
|  |  | ||||||
| # explicitly shut down psql instances gracefully - to avoid hangs or worse on | # explicitly shut down psql instances gracefully - to avoid hangs or worse on | ||||||
| # windows | # windows | ||||||
| $psql_standby{stdin} .= "\\q\n"; | $psql_standby->quit; | ||||||
| $psql_standby{run}->finish; |  | ||||||
|  |  | ||||||
| $node_standby->stop(); | $node_standby->stop(); | ||||||
| $node_primary->stop(); | $node_primary->stop(); | ||||||
| @@ -318,37 +303,6 @@ $node_primary->stop(); | |||||||
|  |  | ||||||
| done_testing(); | done_testing(); | ||||||
|  |  | ||||||
|  |  | ||||||
| sub pump_until_standby |  | ||||||
| { |  | ||||||
| 	my $match = shift; |  | ||||||
|  |  | ||||||
| 	return pump_until($psql_standby{run}, $psql_timeout, |  | ||||||
| 		\$psql_standby{stdout}, $match); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| sub reconnect_and_clear |  | ||||||
| { |  | ||||||
| 	# If psql isn't dead already, tell it to quit as \q, when already dead, |  | ||||||
| 	# causes IPC::Run to unhelpfully error out with "ack Broken pipe:". |  | ||||||
| 	$psql_standby{run}->pump_nb(); |  | ||||||
| 	if ($psql_standby{run}->pumpable()) |  | ||||||
| 	{ |  | ||||||
| 		$psql_standby{stdin} .= "\\q\n"; |  | ||||||
| 	} |  | ||||||
| 	$psql_standby{run}->finish; |  | ||||||
|  |  | ||||||
| 	# restart |  | ||||||
| 	$psql_standby{run}->run(); |  | ||||||
| 	$psql_standby{stdin}  = ''; |  | ||||||
| 	$psql_standby{stdout} = ''; |  | ||||||
|  |  | ||||||
| 	# Run query to ensure connection has finished re-establishing |  | ||||||
| 	$psql_standby{stdin} .= qq[SELECT 1;\n]; |  | ||||||
| 	die unless pump_until_standby(qr/^1$/m); |  | ||||||
| 	$psql_standby{stdout} = ''; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| sub check_conflict_log | sub check_conflict_log | ||||||
| { | { | ||||||
| 	my $message          = shift; | 	my $message          = shift; | ||||||
|   | |||||||
| @@ -28,26 +28,20 @@ sub test_streaming | |||||||
| 	my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; | 	my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; | ||||||
|  |  | ||||||
| 	# Interleave a pair of transactions, each exceeding the 64kB limit. | 	# Interleave a pair of transactions, each exceeding the 64kB limit. | ||||||
| 	my $in  = ''; |  | ||||||
| 	my $out = ''; |  | ||||||
|  |  | ||||||
| 	my $offset = 0; | 	my $offset = 0; | ||||||
|  |  | ||||||
| 	my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); | 	my $h = $node_publisher->background_psql('postgres', | ||||||
|  |  | ||||||
| 	my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, |  | ||||||
| 		on_error_stop => 0); | 		on_error_stop => 0); | ||||||
|  |  | ||||||
| 	# Check the subscriber log from now on. | 	# Check the subscriber log from now on. | ||||||
| 	$offset = -s $node_subscriber->logfile; | 	$offset = -s $node_subscriber->logfile; | ||||||
|  |  | ||||||
| 	$in .= q{ | 	$h->query_safe(q{ | ||||||
| 	BEGIN; | 	BEGIN; | ||||||
| 	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); | 	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); | ||||||
| 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; | 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; | ||||||
| 	DELETE FROM test_tab WHERE mod(a,3) = 0; | 	DELETE FROM test_tab WHERE mod(a,3) = 0; | ||||||
| 	}; | 	}); | ||||||
| 	$h->pump_nb; |  | ||||||
|  |  | ||||||
| 	$node_publisher->safe_psql( | 	$node_publisher->safe_psql( | ||||||
| 		'postgres', q{ | 		'postgres', q{ | ||||||
| @@ -57,11 +51,9 @@ sub test_streaming | |||||||
| 	COMMIT; | 	COMMIT; | ||||||
| 	}); | 	}); | ||||||
|  |  | ||||||
| 	$in .= q{ | 	$h->query_safe('COMMIT'); | ||||||
| 	COMMIT; |     # errors make the next test fail, so ignore them here | ||||||
| 	\q | 	$h->quit; | ||||||
| 	}; |  | ||||||
| 	$h->finish;    # errors make the next test fail, so ignore them here |  | ||||||
|  |  | ||||||
| 	$node_publisher->wait_for_catchup($appname); | 	$node_publisher->wait_for_catchup($appname); | ||||||
|  |  | ||||||
| @@ -219,12 +211,7 @@ $node_subscriber->reload; | |||||||
| $node_subscriber->safe_psql('postgres', q{SELECT 1}); | $node_subscriber->safe_psql('postgres', q{SELECT 1}); | ||||||
|  |  | ||||||
| # Interleave a pair of transactions, each exceeding the 64kB limit. | # Interleave a pair of transactions, each exceeding the 64kB limit. | ||||||
| my $in  = ''; | my $h = $node_publisher->background_psql('postgres', | ||||||
| my $out = ''; |  | ||||||
|  |  | ||||||
| my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); |  | ||||||
|  |  | ||||||
| my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, |  | ||||||
| 	on_error_stop => 0); | 	on_error_stop => 0); | ||||||
|  |  | ||||||
| # Confirm if a deadlock between the leader apply worker and the parallel apply | # Confirm if a deadlock between the leader apply worker and the parallel apply | ||||||
| @@ -232,11 +219,10 @@ my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, | |||||||
|  |  | ||||||
| my $offset = -s $node_subscriber->logfile; | my $offset = -s $node_subscriber->logfile; | ||||||
|  |  | ||||||
| $in .= q{ | $h->query_safe(q{ | ||||||
| BEGIN; | BEGIN; | ||||||
| INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); | INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); | ||||||
| }; | }); | ||||||
| $h->pump_nb; |  | ||||||
|  |  | ||||||
| # Ensure that the parallel apply worker executes the insert command before the | # Ensure that the parallel apply worker executes the insert command before the | ||||||
| # leader worker. | # leader worker. | ||||||
| @@ -246,11 +232,8 @@ $node_subscriber->wait_for_log( | |||||||
|  |  | ||||||
| $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); | $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); | ||||||
|  |  | ||||||
| $in .= q{ | $h->query_safe('COMMIT'); | ||||||
| COMMIT; | $h->quit; | ||||||
| \q |  | ||||||
| }; |  | ||||||
| $h->finish; |  | ||||||
|  |  | ||||||
| $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, | $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, | ||||||
| 	$offset); | 	$offset); | ||||||
| @@ -277,11 +260,10 @@ $node_subscriber->safe_psql('postgres', | |||||||
| # Check the subscriber log from now on. | # Check the subscriber log from now on. | ||||||
| $offset = -s $node_subscriber->logfile; | $offset = -s $node_subscriber->logfile; | ||||||
|  |  | ||||||
| $in .= q{ | $h->query_safe(q{ | ||||||
| BEGIN; | BEGIN; | ||||||
| INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); | INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); | ||||||
| }; | }); | ||||||
| $h->pump_nb; |  | ||||||
|  |  | ||||||
| # Ensure that the first parallel apply worker executes the insert command | # Ensure that the first parallel apply worker executes the insert command | ||||||
| # before the second one. | # before the second one. | ||||||
| @@ -292,11 +274,8 @@ $node_subscriber->wait_for_log( | |||||||
| $node_publisher->safe_psql('postgres', | $node_publisher->safe_psql('postgres', | ||||||
| 	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); | 	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); | ||||||
|  |  | ||||||
| $in .= q{ | $h->query_safe('COMMIT'); | ||||||
| COMMIT; | $h->quit; | ||||||
| \q |  | ||||||
| }; |  | ||||||
| $h->finish; |  | ||||||
|  |  | ||||||
| $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, | $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, | ||||||
| 	$offset); | 	$offset); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user