1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-31 22:04:40 +03:00

Back-Patch "Add wait_for_subscription_sync for TAP tests."

This was originally done in commit 0c20dd33db for 16 only, to eliminate
duplicate code and as an infrastructure that makes it easier to write
future tests. However, it has been suggested that it would be good to
back-patch this testing infrastructure to aid future tests in
back-branches.

Backpatch to all supported versions.

Author: Masahiko Sawada
Reviewed by: Amit Kapila, Shi yu
Discussion: https://postgr.es/m/CAD21AoC-fvAkaKHa4t1urupwL8xbAcWRePeETvshvy80f6WV1A@mail.gmail.com
Discussion: https://postgr.es/m/E1oJBIf-0006sw-SA@gemulon.postgresql.org
This commit is contained in:
Amit Kapila
2022-08-12 11:28:54 +05:30
parent eeb1671b11
commit 15014b8232
26 changed files with 125 additions and 245 deletions

View File

@ -2648,6 +2648,50 @@ sub wait_for_slot_catchup
=pod =pod
=item $node->wait_for_subscription_sync(publisher, subname, dbname)
Wait for all tables in pg_subscription_rel to complete the initial
synchronization (i.e to be either in 'syncdone' or 'ready' state).
If the publisher node is given, additionally, check if the subscriber has
caught up to what has been committed on the primary. This is useful to
ensure that the initial data synchronization has been completed after
creating a new subscription.
If there is no active replication connection from this peer, wait until
poll_query_until timeout.
This is not a test. It die()s on failure.
=cut
sub wait_for_subscription_sync
{
my ($self, $publisher, $subname, $dbname) = @_;
my $name = $self->name;
$dbname = defined($dbname) ? $dbname : 'postgres';
# Wait for all tables to finish initial sync.
print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
my $query =
qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');];
$self->poll_query_until($dbname, $query)
or croak "timed out waiting for subscriber to synchronize data";
# Then, wait for the replication to catchup if required.
if (defined($publisher))
{
croak 'subscription name must be specified' unless defined($subname);
$publisher->wait_for_catchup($subname);
}
print "done\n";
return;
}
=pod
=item $node->wait_for_log(regexp, offset) =item $node->wait_for_log(regexp, offset)
Waits for the contents of the server log file, starting at the given offset, to Waits for the contents of the server log file, starting at the given offset, to

View File

@ -102,13 +102,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
); );
$node_publisher->wait_for_catchup('tap_sub'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
@ -237,13 +232,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2" "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
); );
$node_publisher->wait_for_catchup('tap_sub_temp1'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1');
# Also wait for initial table sync to finish
$synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Subscriber table will have no rows initially # Subscriber table will have no rows initially
$result = $result =

View File

@ -114,13 +114,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
); );
$node_publisher->wait_for_catchup('tap_sub');
# Wait for initial sync to finish as well # Wait for initial sync to finish as well
my $synced_query = $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Insert initial test data # Insert initial test data
$node_publisher->safe_psql( $node_publisher->safe_psql(

View File

@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
); );
$node_publisher->wait_for_catchup('tap_sub'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
# wait for sync to finish this time # wait for sync to finish this time
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# check that all data is synced # check that all data is synced
$result = $result =
@ -107,8 +101,7 @@ $node_subscriber->safe_psql('postgres',
); );
# and wait for data sync to finish again # and wait for data sync to finish again
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# check that all data is synced # check that all data is synced
$result = $result =
@ -133,8 +126,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
# wait for sync to finish # wait for sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next"); "SELECT count(*) FROM tab_rep_next");

View File

@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
); );
$node_publisher->wait_for_catchup('mysub'); # Wait for initial sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
# Wait for initial sync to finish as well
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8

View File

@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
); );
$node_publisher->wait_for_catchup('mysub'); # Wait for initial sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
# Wait for initial sync to finish as well
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});

View File

@ -49,13 +49,8 @@ ok( $stderr =~
m/WARNING: publication "non_existent_pub" does not exist in the publisher/, m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
"Create subscription throws warning for non-existent publication"); "Create subscription throws warning for non-existent publication");
$node_publisher->wait_for_catchup('mysub1'); # Wait for initial table sync to finish.
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
# Also wait for initial table sync to finish.
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Specifying non-existent publication along with add publication. # Specifying non-existent publication along with add publication.
($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',

View File

@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
); );
$node_publisher->wait_for_catchup('tap_sub'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',
@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# Add replica identity column. (The serial is not necessary, but it's # Add replica identity column. (The serial is not necessary, but it's
# a convenient way to get a default on the new column so that rows # a convenient way to get a default on the new column so that rows

View File

@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres',
); );
# Wait for initial sync of all subscriptions # Wait for initial sync of all subscriptions
my $synced_query = $node_subscriber->wait_for_subscription_sync;
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# insert data to truncate # insert data to truncate
@ -211,8 +208,7 @@ $node_subscriber->safe_psql('postgres',
); );
# wait for initial data sync # wait for initial data sync
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# insert data to truncate # insert data to truncate

View File

@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres',
); );
# Wait for initial sync of all subscriptions # Wait for initial sync of all subscriptions
my $synced_query = $node_subscriber->wait_for_subscription_sync;
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
is( $result, qq(1|22 is( $result, qq(1|22

View File

@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
}); });
# Wait for initial sync of all subscriptions # Wait for initial sync of all subscriptions
my $synced_query = $node_subscriber1->wait_for_subscription_sync;
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; $node_subscriber2->wait_for_subscription_sync;
$node_subscriber1->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Tests for replication using leaf partition identity and schema # Tests for replication using leaf partition identity and schema
@ -490,10 +486,8 @@ $node_subscriber2->safe_psql('postgres',
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
# Wait for initial sync of all subscriptions # Wait for initial sync of all subscriptions
$node_subscriber1->poll_query_until('postgres', $synced_query) $node_subscriber1->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data"; $node_subscriber2->wait_for_subscription_sync;
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# check that data is synced correctly # check that data is synced correctly
$result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2"); $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres',
# make sure the subscription on the second subscriber is synced, before # make sure the subscription on the second subscriber is synced, before
# continuing # continuing
$node_subscriber2->poll_query_until('postgres', $synced_query) $node_subscriber2->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# Insert a change into the leaf partition, should be replicated through # Insert a change into the leaf partition, should be replicated through
# the partition root (thanks to the FOR ALL TABLES partition). # the partition root (thanks to the FOR ALL TABLES partition).
@ -824,8 +817,7 @@ $node_subscriber2->safe_psql(
$node_subscriber2->safe_psql('postgres', $node_subscriber2->safe_psql('postgres',
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
$node_subscriber2->poll_query_until('postgres', $synced_query) $node_subscriber2->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
# Make partition map cache # Make partition map cache
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");

View File

@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres',
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
# Ensure nodes are in sync with each other # Ensure nodes are in sync with each other
$node_publisher->wait_for_catchup('tsub'); $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
$node_subscriber->poll_query_until('postgres',
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
) or die "Timed out while waiting for subscriber to synchronize data";
# Insert some content and make sure it's replicated across # Insert some content and make sure it's replicated across
$node_publisher->safe_psql( $node_publisher->safe_psql(

View File

@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
); );
$node_publisher->wait_for_catchup($appname); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',

View File

@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
); );
$node_publisher->wait_for_catchup($appname); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',

View File

@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
); );
$node_publisher->wait_for_catchup($appname); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',

View File

@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
); );
$node_publisher->wait_for_catchup($appname); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',

View File

@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
); );
$node_publisher->wait_for_catchup($appname); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result = my $result =
$node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres',

View File

@ -53,14 +53,8 @@ $node_subscriber->safe_psql(
PUBLICATION tap_pub PUBLICATION tap_pub
WITH (two_phase = on)"); WITH (two_phase = on)");
# Wait for subscriber to finish initialization # Wait for initial table sync to finish
$node_publisher->wait_for_catchup($appname); $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Also wait for two-phase to be enabled # Also wait for two-phase to be enabled
my $twophase_query = my $twophase_query =
@ -331,12 +325,8 @@ $node_subscriber->safe_psql(
PUBLICATION tap_pub_copy PUBLICATION tap_pub_copy
WITH (two_phase=on, copy_data=false);"); WITH (two_phase=on, copy_data=false);");
# Wait for subscriber to finish initialization # Wait for initial table sync to finish
$node_publisher->wait_for_catchup($appname_copy); $node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
# Also wait for initial table sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Also wait for two-phase to be enabled # Also wait for two-phase to be enabled
$node_subscriber->poll_query_until('postgres', $twophase_query) $node_subscriber->poll_query_until('postgres', $twophase_query)

View File

@ -55,14 +55,8 @@ $node_subscriber->safe_psql(
PUBLICATION tap_pub PUBLICATION tap_pub
WITH (streaming = on, two_phase = on)"); WITH (streaming = on, two_phase = on)");
# Wait for subscriber to finish initialization # Wait for initial table sync to finish
$node_publisher->wait_for_catchup($appname); $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Also wait for two-phase to be enabled # Also wait for two-phase to be enabled
my $twophase_query = my $twophase_query =

View File

@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres',
); );
# Wait for initial table sync to finish # Wait for initial table sync to finish
my $synced_query = $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->wait_for_catchup('tap_sub');
# Check the initial data of tab_1 is copied to subscriber # Check the initial data of tab_1 is copied to subscriber
my $result = $node_subscriber->safe_psql('postgres', my $result = $node_subscriber->safe_psql('postgres',
@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1"); "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
# Wait for initial table sync to finish # Wait for initial table sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->wait_for_catchup('tap_sub');
# Check the initial data of tab_drop_refresh was copied to subscriber # Check the initial data of tab_drop_refresh was copied to subscriber
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1"); "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
# Wait for initial table sync to finish # Wait for initial table sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->wait_for_catchup('tap_sub');
# Check the initial data of tab_1 was copied to subscriber again # Check the initial data of tab_1 was copied to subscriber again
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',

View File

@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
); );
$node_publisher->wait_for_catchup('tap_sub_schema'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema');
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Check the schema table data is synced up # Check the schema table data is synced up
my $result = $node_subscriber->safe_psql('postgres', my $result = $node_subscriber->safe_psql('postgres',
@ -123,8 +118,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish # Wait for sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)"); $node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
@ -158,8 +152,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish # Wait for sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')" "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
@ -183,8 +176,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish # Wait for sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
$result = $node_subscriber->safe_psql('postgres', $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')" "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"

View File

@ -153,13 +153,8 @@ SET SESSION AUTHORIZATION regress_admin;
CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice; CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
)); ));
$node_publisher->wait_for_catchup('admin_sub'); # Wait for initial sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
# Wait for initial sync to finish as well
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Verify that "regress_admin" can replicate into the tables # Verify that "regress_admin" can replicate into the tables
# #

View File

@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start; $node_subscriber->start;
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $appname = 'tap_sub'; my $appname = 'tap_sub';
@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
); );
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish # wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
or die "Timed out while waiting for subscriber to synchronize data";
# The subscription of the FOR ALL TABLES publication means there should be no # The subscription of the FOR ALL TABLES publication means there should be no
# filtering on the tablesync COPY, so all expect all 5 will be present. # filtering on the tablesync COPY, so all expect all 5 will be present.
@ -133,10 +128,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
); );
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish # wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
or die "Timed out while waiting for subscriber to synchronize data";
# The subscription of the ALL TABLES IN SCHEMA publication means there should be # The subscription of the ALL TABLES IN SCHEMA publication means there should be
# no filtering on the tablesync COPY, so expect all 5 will be present. # no filtering on the tablesync COPY, so expect all 5 will be present.
@ -397,11 +390,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
); );
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish # wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
or die "Timed out while waiting for subscriber to synchronize data";
# Check expected replicated rows for tab_rowfilter_1 # Check expected replicated rows for tab_rowfilter_1
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered') # tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
@ -622,8 +612,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)"); "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
# wait for table synchronization to finish # wait for table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query) $node_subscriber->wait_for_subscription_sync;
or die "Timed out while waiting for subscriber to synchronize data";
$node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)" "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"

View File

@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the data to replicate. # Wait for the data to replicate.
$node_publisher->wait_for_catchup('sub'); $node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
$node_subscriber->poll_query_until('postgres',
"SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
);
# Confirm that we have finished the table sync. # Confirm that we have finished the table sync.
my $result = my $result =

View File

@ -22,18 +22,6 @@ $node_subscriber->start;
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $offset = 0; my $offset = 0;
sub wait_for_subscription_sync
{
my ($node) = @_;
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
}
# setup tables on both nodes # setup tables on both nodes
# tab1: simple 1:1 replication # tab1: simple 1:1 replication
@ -160,7 +148,7 @@ $node_subscriber->safe_psql(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
# tab1: only (a,b) is replicated # tab1: only (a,b) is replicated
$result = $result =
@ -333,7 +321,7 @@ $node_subscriber->safe_psql('postgres',
# wait for the tablesync to complete, add a bit more data and then check # wait for the tablesync to complete, add a bit more data and then check
# the results of the replication # the results of the replication
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -385,9 +373,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');
$node_publisher->wait_for_catchup('sub1');
# insert data and make sure the columns in column list get fully replicated # insert data and make sure the columns in column list get fully replicated
$node_publisher->safe_psql( $node_publisher->safe_psql(
@ -428,7 +414,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -465,7 +451,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -504,7 +490,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -621,7 +607,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -687,7 +673,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -758,7 +744,7 @@ $node_subscriber->safe_psql(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -795,7 +781,7 @@ $node_subscriber->safe_psql(
TRUNCATE test_part_c; TRUNCATE test_part_c;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -855,7 +841,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9 ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -898,7 +884,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -938,7 +924,7 @@ $node_subscriber->safe_psql(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -985,7 +971,7 @@ $node_subscriber->safe_psql(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -1034,7 +1020,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2; ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -1058,7 +1044,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1; ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -1102,7 +1088,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3; ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(
@ -1150,7 +1136,7 @@ $node_subscriber->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4; ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
)); ));
wait_for_subscription_sync($node_subscriber); $node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql( $node_publisher->safe_psql(
'postgres', qq( 'postgres', qq(

View File

@ -144,12 +144,7 @@ $node_twoways->safe_psql('d2',
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient # We cannot rely solely on wait_for_catchup() here; it isn't sufficient
# when tablesync workers might still be running. So in addition to that, # when tablesync workers might still be running. So in addition to that,
# verify that tables are synced. # verify that tables are synced.
# XXX maybe this should be integrated in wait_for_catchup() itself. $node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
$node_twoways->wait_for_catchup('testsub');
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_twoways->poll_query_until('d2', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
$rows * 2, "2x$rows rows in t"); $rows * 2, "2x$rows rows in t");
@ -278,11 +273,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
); );
$node_publisher->wait_for_catchup('tap_sub'); # Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Also wait for initial table sync to finish
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
is( $node_subscriber->safe_psql( is( $node_subscriber->safe_psql(
'postgres', "SELECT * FROM tab_replidentity_index"), 'postgres', "SELECT * FROM tab_replidentity_index"),