From 0c20dd33db1607d6a85ffce24238c1e55e384b49 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 3 Aug 2022 15:31:17 +0530 Subject: [PATCH] Add wait_for_subscription_sync for TAP tests. The TAP tests for logical replication in src/test/subscription are using the following code in many places to make sure that the subscription is synchronized with the publisher: $node_publisher->wait_for_catchup('tap_sub'); $node_subscriber->poll_query_until('postgres', qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's')]); The new function wait_for_subscription_sync() can be used to replace the above code. This eliminates duplicated code and makes it easier to write future tests. Author: Masahiko Sawada Reviewed by: Amit Kapila, Shi yu Discussion: https://postgr.es/m/CAD21AoC-fvAkaKHa4t1urupwL8xbAcWRePeETvshvy80f6WV1A@mail.gmail.com --- src/test/perl/PostgreSQL/Test/Cluster.pm | 44 ++++++++++++++++ src/test/subscription/t/001_rep_changes.pl | 18 ++----- src/test/subscription/t/002_types.pl | 7 +-- src/test/subscription/t/004_sync.pl | 18 ++----- src/test/subscription/t/005_encoding.pl | 9 +--- src/test/subscription/t/006_rewrite.pl | 9 +--- src/test/subscription/t/007_ddl.pl | 9 +--- src/test/subscription/t/008_diff_schema.pl | 12 ++--- src/test/subscription/t/010_truncate.pl | 8 +-- src/test/subscription/t/011_generated.pl | 5 +- src/test/subscription/t/013_partition.pl | 20 +++----- src/test/subscription/t/014_binary.pl | 5 +- src/test/subscription/t/015_stream.pl | 9 +--- src/test/subscription/t/016_stream_subxact.pl | 9 +--- src/test/subscription/t/017_stream_ddl.pl | 9 +--- .../t/018_stream_subxact_abort.pl | 9 +--- .../t/019_stream_subxact_ddl_abort.pl | 9 +--- src/test/subscription/t/021_twophase.pl | 18 ++----- .../subscription/t/023_twophase_stream.pl | 10 +--- src/test/subscription/t/024_add_drop_pub.pl | 18 ++----- .../t/025_rep_changes_for_schema.pl | 18 ++----- src/test/subscription/t/027_nosuperuser.pl | 9 +--- src/test/subscription/t/028_row_filter.pl | 19 ++----- src/test/subscription/t/029_on_error.pl | 5 +- src/test/subscription/t/030_origin.pl | 19 ++----- src/test/subscription/t/031_column_list.pl | 50 +++++++------------ src/test/subscription/t/100_bugs.pl | 14 ++---- 27 files changed, 129 insertions(+), 260 deletions(-) diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index c8c7bc5045a..27fa607da41 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -2648,6 +2648,50 @@ sub wait_for_slot_catchup =pod +=item $node->wait_for_subscription_sync(publisher, subname, dbname) + +Wait for all tables in pg_subscription_rel to complete the initial +synchronization (i.e to be either in 'syncdone' or 'ready' state). + +If the publisher node is given, additionally, check if the subscriber has +caught up to what has been committed on the primary. This is useful to +ensure that the initial data synchronization has been completed after +creating a new subscription. + +If there is no active replication connection from this peer, wait until +poll_query_until timeout. + +This is not a test. It die()s on failure. + +=cut + +sub wait_for_subscription_sync +{ + my ($self, $publisher, $subname, $dbname) = @_; + my $name = $self->name; + + $dbname = defined($dbname) ? $dbname : 'postgres'; + + # Wait for all tables to finish initial sync. + print "Waiting for all subscriptions in \"$name\" to synchronize data\n"; + my $query = + qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');]; + $self->poll_query_until($dbname, $query) + or croak "timed out waiting for subscriber to synchronize data"; + + # Then, wait for the replication to catchup if required. + if (defined($publisher)) + { + croak 'subscription name must be specified' unless defined($subname); + $publisher->wait_for_catchup($subname); + } + + print "done\n"; + return; +} + +=pod + =item $node->wait_for_log(regexp, offset) Waits for the contents of the server log file, starting at the given offset, to diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index f53b3b7db0c..c5b5be419cc 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -102,13 +102,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); @@ -237,13 +232,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2" ); -$node_publisher->wait_for_catchup('tap_sub_temp1'); - -# Also wait for initial table sync to finish -$synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1'); # Subscriber table will have no rows initially $result = diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index 3f1f00f7c8d..d6c6f493272 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -114,13 +114,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" ); -$node_publisher->wait_for_catchup('tap_sub'); - # Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); # Insert initial test data $node_publisher->safe_psql( diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index cf61fc1e0f6..fd4bf7bacd1 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); @@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query) $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); # wait for sync to finish this time -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # check that all data is synced $result = @@ -107,8 +101,7 @@ $node_subscriber->safe_psql('postgres', ); # and wait for data sync to finish again -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # check that all data is synced $result = @@ -133,8 +126,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); # wait for sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl index 38a74a897f8..3ee0522460b 100644 --- a/src/test/subscription/t/005_encoding.pl +++ b/src/test/subscription/t/005_encoding.pl @@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); -$node_publisher->wait_for_catchup('mysub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub'); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl index c924ff35f71..fdcb3f811ce 100644 --- a/src/test/subscription/t/006_rewrite.pl +++ b/src/test/subscription/t/006_rewrite.pl @@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" ); -$node_publisher->wait_for_catchup('mysub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub'); $node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 01df54229c4..8882addc18f 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -49,13 +49,8 @@ ok( $stderr =~ m/WARNING: publication "non_existent_pub" does not exist in the publisher/, "Create subscription throws warning for non-existent publication"); -$node_publisher->wait_for_catchup('mysub1'); - -# Also wait for initial table sync to finish. -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish. +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1'); # Specifying non-existent publication along with add publication. ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl index 67b4026afa4..b4d44a200bb 100644 --- a/src/test/subscription/t/008_diff_schema.pl +++ b/src/test/subscription/t/008_diff_schema.pl @@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); my $result = $node_subscriber->safe_psql('postgres', @@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # Add replica identity column. (The serial is not necessary, but it's # a convenient way to get a default on the new column so that rows diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index d5192494313..a6fe82a71f2 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres', ); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # insert data to truncate @@ -211,8 +208,7 @@ $node_subscriber->safe_psql('postgres', ); # wait for initial data sync -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; # insert data to truncate diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index e991a080321..3d96f6f30f8 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres', ); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); is( $result, qq(1|22 diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 0dfbbabc3b0..8b33e4e7ae1 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger; }); # Wait for initial sync of all subscriptions -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber1->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber1->wait_for_subscription_sync; +$node_subscriber2->wait_for_subscription_sync; # Tests for replication using leaf partition identity and schema @@ -490,10 +486,8 @@ $node_subscriber2->safe_psql('postgres', "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); # Wait for initial sync of all subscriptions -$node_subscriber1->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber1->wait_for_subscription_sync; +$node_subscriber2->wait_for_subscription_sync; # check that data is synced correctly $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2"); @@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres', # make sure the subscription on the second subscriber is synced, before # continuing -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->wait_for_subscription_sync; # Insert a change into the leaf partition, should be replicated through # the partition root (thanks to the FOR ALL TABLES partition). @@ -824,8 +817,7 @@ $node_subscriber2->safe_psql( $node_subscriber2->safe_psql('postgres', "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); -$node_subscriber2->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->wait_for_subscription_sync; # Make partition map cache $node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl index a1f03e7adc2..8d8b35721fc 100644 --- a/src/test/subscription/t/014_binary.pl +++ b/src/test/subscription/t/014_binary.pl @@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres', . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); # Ensure nodes are in sync with each other -$node_publisher->wait_for_catchup('tsub'); -$node_subscriber->poll_query_until('postgres', - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');" -) or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); # Insert some content and make sure it's replicated across $node_publisher->safe_psql( diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 6561b189de8..cbaa327e441 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index f27f1694f29..bc0a9cd0531 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl index 0bce63b7167..866f1512e47 100644 --- a/src/test/subscription/t/017_stream_ddl.pl +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 7155442e76c..551f16df6dd 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index dbd0fca4d1e..4d7da82b7a8 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index c3e9857f7ce..caa90897ec0 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -53,14 +53,8 @@ $node_subscriber->safe_psql( PUBLICATION tap_pub WITH (two_phase = on)"); -# Wait for subscriber to finish initialization -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # Also wait for two-phase to be enabled my $twophase_query = @@ -331,12 +325,8 @@ $node_subscriber->safe_psql( PUBLICATION tap_pub_copy WITH (two_phase=on, copy_data=false);"); -# Wait for subscriber to finish initialization -$node_publisher->wait_for_catchup($appname_copy); - -# Also wait for initial table sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); # Also wait for two-phase to be enabled $node_subscriber->poll_query_until('postgres', $twophase_query) diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index d8475d25a49..9b454106bdf 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -55,14 +55,8 @@ $node_subscriber->safe_psql( PUBLICATION tap_pub WITH (streaming = on, two_phase = on)"); -# Wait for subscriber to finish initialization -$node_publisher->wait_for_catchup($appname); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # Also wait for two-phase to be enabled my $twophase_query = diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index 246f8c92372..eaf47e66f1a 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres', ); # Wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; - -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; - -$node_publisher->wait_for_catchup('tap_sub'); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); # Check the initial data of tab_1 is copied to subscriber my $result = $node_subscriber->safe_psql('postgres', @@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1"); # Wait for initial table sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; - -$node_publisher->wait_for_catchup('tap_sub'); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); # Check the initial data of tab_drop_refresh was copied to subscriber $result = $node_subscriber->safe_psql('postgres', @@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1"); # Wait for initial table sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; - -$node_publisher->wait_for_catchup('tap_sub'); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); # Check the initial data of tab_1 was copied to subscriber again $result = $node_subscriber->safe_psql('postgres', diff --git a/src/test/subscription/t/025_rep_changes_for_schema.pl b/src/test/subscription/t/025_rep_changes_for_schema.pl index 5ce275cf725..627c63b529a 100644 --- a/src/test/subscription/t/025_rep_changes_for_schema.pl +++ b/src/test/subscription/t/025_rep_changes_for_schema.pl @@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" ); -$node_publisher->wait_for_catchup('tap_sub_schema'); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema'); # Check the schema table data is synced up my $result = $node_subscriber->safe_psql('postgres', @@ -123,8 +118,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); # Wait for sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)"); @@ -158,8 +152,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); # Wait for sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')" @@ -183,8 +176,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); # Wait for sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')" diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index e8f3e4bba11..f9f6117dfb4 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -153,13 +153,8 @@ SET SESSION AUTHORIZATION regress_admin; CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice; )); -$node_publisher->wait_for_catchup('admin_sub'); - -# Wait for initial sync to finish as well -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub'); # Verify that "regress_admin" can replicate into the tables # diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index b1fb2d7cae4..f5f8a670920 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; - my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $appname = 'tap_sub'; @@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall" ); -$node_publisher->wait_for_catchup($appname); # wait for initial table synchronization to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # The subscription of the FOR ALL TABLES publication means there should be no # filtering on the tablesync COPY, so all expect all 5 will be present. @@ -133,10 +128,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema" ); -$node_publisher->wait_for_catchup($appname); # wait for initial table synchronization to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # The subscription of the ALL TABLES IN SCHEMA publication means there should be # no filtering on the tablesync COPY, so expect all 5 will be present. @@ -397,11 +390,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1" ); -$node_publisher->wait_for_catchup($appname); - # wait for initial table synchronization to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # Check expected replicated rows for tab_rowfilter_1 # tap_pub_1 filter is: (a > 1000 AND b <> 'filtered') @@ -622,8 +612,7 @@ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)"); # wait for table synchronization to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)" diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 303e8ec3fc2..05daa77c58a 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl"); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); # Wait for the data to replicate. -$node_publisher->wait_for_catchup('sub'); -$node_subscriber->poll_query_until('postgres', - "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass" -); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); # Confirm that we have finished the table sync. my $result = diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index e9241d29966..b297a51f7c7 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -51,17 +51,9 @@ $node_A->safe_psql( PUBLICATION tap_pub_B WITH (origin = none, copy_data = off)"); -# Wait for subscribers to finish initialization -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_A->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_B->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $appname_A); +$node_B->wait_for_subscription_sync($node_A, $appname_B1); is(1, 1, 'Bidirectional replication setup is complete'); @@ -126,10 +118,7 @@ $node_B->safe_psql( PUBLICATION tap_pub_C WITH (origin = none)"); -$node_C->wait_for_catchup($appname_B2); - -$node_B->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->wait_for_subscription_sync($node_C, $appname_B2); # insert a record $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 9fa6e0b35ff..b6644556cf4 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -22,18 +22,6 @@ $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $offset = 0; -sub wait_for_subscription_sync -{ - my ($node) = @_; - - # Also wait for initial table sync to finish - my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; - - $node->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -} - # setup tables on both nodes # tab1: simple 1:1 replication @@ -160,7 +148,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; # tab1: only (a,b) is replicated $result = @@ -333,7 +321,7 @@ $node_subscriber->safe_psql('postgres', # wait for the tablesync to complete, add a bit more data and then check # the results of the replication -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -385,9 +373,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3 )); -wait_for_subscription_sync($node_subscriber); - -$node_publisher->wait_for_catchup('sub1'); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1'); # insert data and make sure the columns in column list get fully replicated $node_publisher->safe_psql( @@ -428,7 +414,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -465,7 +451,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -504,7 +490,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -621,7 +607,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -687,7 +673,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -758,7 +744,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -795,7 +781,7 @@ $node_subscriber->safe_psql( TRUNCATE test_part_c; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -855,7 +841,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9 )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -898,7 +884,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -938,7 +924,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -985,7 +971,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -1034,7 +1020,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -1058,7 +1044,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -1102,7 +1088,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( @@ -1150,7 +1136,7 @@ $node_subscriber->safe_psql( ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4; )); -wait_for_subscription_sync($node_subscriber); +$node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 11ba4737159..6247aa77301 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -144,12 +144,7 @@ $node_twoways->safe_psql('d2', # We cannot rely solely on wait_for_catchup() here; it isn't sufficient # when tablesync workers might still be running. So in addition to that, # verify that tables are synced. -# XXX maybe this should be integrated in wait_for_catchup() itself. -$node_twoways->wait_for_catchup('testsub'); -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_twoways->poll_query_until('d2', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2'); is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), $rows * 2, "2x$rows rows in t"); @@ -278,11 +273,8 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -$node_publisher->wait_for_catchup('tap_sub'); - -# Also wait for initial table sync to finish -$node_subscriber->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM tab_replidentity_index"),