mirror of
https://github.com/postgres/postgres.git
synced 2025-05-17 06:41:24 +03:00
Add wait_for_subscription_sync for TAP tests.
The TAP tests for logical replication in src/test/subscription are using the following code in many places to make sure that the subscription is synchronized with the publisher: $node_publisher->wait_for_catchup('tap_sub'); $node_subscriber->poll_query_until('postgres', qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's')]); The new function wait_for_subscription_sync() can be used to replace the above code. This eliminates duplicated code and makes it easier to write future tests. Author: Masahiko Sawada Reviewed by: Amit Kapila, Shi yu Discussion: https://postgr.es/m/CAD21AoC-fvAkaKHa4t1urupwL8xbAcWRePeETvshvy80f6WV1A@mail.gmail.com
This commit is contained in:
parent
c67c2e2a29
commit
0c20dd33db
@ -2648,6 +2648,50 @@ sub wait_for_slot_catchup
|
|||||||
|
|
||||||
=pod
|
=pod
|
||||||
|
|
||||||
|
=item $node->wait_for_subscription_sync(publisher, subname, dbname)
|
||||||
|
|
||||||
|
Wait for all tables in pg_subscription_rel to complete the initial
|
||||||
|
synchronization (i.e to be either in 'syncdone' or 'ready' state).
|
||||||
|
|
||||||
|
If the publisher node is given, additionally, check if the subscriber has
|
||||||
|
caught up to what has been committed on the primary. This is useful to
|
||||||
|
ensure that the initial data synchronization has been completed after
|
||||||
|
creating a new subscription.
|
||||||
|
|
||||||
|
If there is no active replication connection from this peer, wait until
|
||||||
|
poll_query_until timeout.
|
||||||
|
|
||||||
|
This is not a test. It die()s on failure.
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub wait_for_subscription_sync
|
||||||
|
{
|
||||||
|
my ($self, $publisher, $subname, $dbname) = @_;
|
||||||
|
my $name = $self->name;
|
||||||
|
|
||||||
|
$dbname = defined($dbname) ? $dbname : 'postgres';
|
||||||
|
|
||||||
|
# Wait for all tables to finish initial sync.
|
||||||
|
print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
|
||||||
|
my $query =
|
||||||
|
qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');];
|
||||||
|
$self->poll_query_until($dbname, $query)
|
||||||
|
or croak "timed out waiting for subscriber to synchronize data";
|
||||||
|
|
||||||
|
# Then, wait for the replication to catchup if required.
|
||||||
|
if (defined($publisher))
|
||||||
|
{
|
||||||
|
croak 'subscription name must be specified' unless defined($subname);
|
||||||
|
$publisher->wait_for_catchup($subname);
|
||||||
|
}
|
||||||
|
|
||||||
|
print "done\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
=pod
|
||||||
|
|
||||||
=item $node->wait_for_log(regexp, offset)
|
=item $node->wait_for_log(regexp, offset)
|
||||||
|
|
||||||
Waits for the contents of the server log file, starting at the given offset, to
|
Waits for the contents of the server log file, starting at the given offset, to
|
||||||
|
@ -102,13 +102,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
|
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
|
||||||
@ -237,13 +232,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
|
"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub_temp1');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
$synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Subscriber table will have no rows initially
|
# Subscriber table will have no rows initially
|
||||||
$result =
|
$result =
|
||||||
|
@ -114,13 +114,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
|
||||||
|
|
||||||
# Wait for initial sync to finish as well
|
# Wait for initial sync to finish as well
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Insert initial test data
|
# Insert initial test data
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
|
@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
|
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
|
||||||
@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
|
|||||||
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
|
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
|
||||||
|
|
||||||
# wait for sync to finish this time
|
# wait for sync to finish this time
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that all data is synced
|
# check that all data is synced
|
||||||
$result =
|
$result =
|
||||||
@ -107,8 +101,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# and wait for data sync to finish again
|
# and wait for data sync to finish again
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that all data is synced
|
# check that all data is synced
|
||||||
$result =
|
$result =
|
||||||
@ -133,8 +126,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
||||||
|
|
||||||
# wait for sync to finish
|
# wait for sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
"SELECT count(*) FROM tab_rep_next");
|
"SELECT count(*) FROM tab_rep_next");
|
||||||
|
@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('mysub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres',
|
$node_publisher->safe_psql('postgres',
|
||||||
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
|
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
|
||||||
|
@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('mysub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres',
|
$node_publisher->safe_psql('postgres',
|
||||||
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
|
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
|
||||||
|
@ -49,13 +49,8 @@ ok( $stderr =~
|
|||||||
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
|
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
|
||||||
"Create subscription throws warning for non-existent publication");
|
"Create subscription throws warning for non-existent publication");
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('mysub1');
|
# Wait for initial table sync to finish.
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
|
||||||
# Also wait for initial table sync to finish.
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Specifying non-existent publication along with add publication.
|
# Specifying non-existent publication along with add publication.
|
||||||
($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
|
($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
|
||||||
|
@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
|
|||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
||||||
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Add replica identity column. (The serial is not necessary, but it's
|
# Add replica identity column. (The serial is not necessary, but it's
|
||||||
# a convenient way to get a default on the new column so that rows
|
# a convenient way to get a default on the new column so that rows
|
||||||
|
@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# insert data to truncate
|
# insert data to truncate
|
||||||
|
|
||||||
@ -211,8 +208,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# wait for initial data sync
|
# wait for initial data sync
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# insert data to truncate
|
# insert data to truncate
|
||||||
|
|
||||||
|
@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
|
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
|
||||||
is( $result, qq(1|22
|
is( $result, qq(1|22
|
||||||
|
@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
|
|||||||
});
|
});
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber1->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
$node_subscriber1->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Tests for replication using leaf partition identity and schema
|
# Tests for replication using leaf partition identity and schema
|
||||||
|
|
||||||
@ -490,10 +486,8 @@ $node_subscriber2->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
|
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
$node_subscriber1->poll_query_until('postgres', $synced_query)
|
$node_subscriber1->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that data is synced correctly
|
# check that data is synced correctly
|
||||||
$result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
|
$result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
|
||||||
@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres',
|
|||||||
|
|
||||||
# make sure the subscription on the second subscriber is synced, before
|
# make sure the subscription on the second subscriber is synced, before
|
||||||
# continuing
|
# continuing
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Insert a change into the leaf partition, should be replicated through
|
# Insert a change into the leaf partition, should be replicated through
|
||||||
# the partition root (thanks to the FOR ALL TABLES partition).
|
# the partition root (thanks to the FOR ALL TABLES partition).
|
||||||
@ -824,8 +817,7 @@ $node_subscriber2->safe_psql(
|
|||||||
$node_subscriber2->safe_psql('postgres',
|
$node_subscriber2->safe_psql('postgres',
|
||||||
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
|
||||||
|
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Make partition map cache
|
# Make partition map cache
|
||||||
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
|
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
|
||||||
|
@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
|
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
|
||||||
|
|
||||||
# Ensure nodes are in sync with each other
|
# Ensure nodes are in sync with each other
|
||||||
$node_publisher->wait_for_catchup('tsub');
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
|
||||||
$node_subscriber->poll_query_until('postgres',
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
|
|
||||||
) or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Insert some content and make sure it's replicated across
|
# Insert some content and make sure it's replicated across
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
|
@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
|
@ -53,14 +53,8 @@ $node_subscriber->safe_psql(
|
|||||||
PUBLICATION tap_pub
|
PUBLICATION tap_pub
|
||||||
WITH (two_phase = on)");
|
WITH (two_phase = on)");
|
||||||
|
|
||||||
# Wait for subscriber to finish initialization
|
# Wait for initial table sync to finish
|
||||||
$node_publisher->wait_for_catchup($appname);
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
|
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Also wait for two-phase to be enabled
|
# Also wait for two-phase to be enabled
|
||||||
my $twophase_query =
|
my $twophase_query =
|
||||||
@ -331,12 +325,8 @@ $node_subscriber->safe_psql(
|
|||||||
PUBLICATION tap_pub_copy
|
PUBLICATION tap_pub_copy
|
||||||
WITH (two_phase=on, copy_data=false);");
|
WITH (two_phase=on, copy_data=false);");
|
||||||
|
|
||||||
# Wait for subscriber to finish initialization
|
# Wait for initial table sync to finish
|
||||||
$node_publisher->wait_for_catchup($appname_copy);
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
|
||||||
|
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Also wait for two-phase to be enabled
|
# Also wait for two-phase to be enabled
|
||||||
$node_subscriber->poll_query_until('postgres', $twophase_query)
|
$node_subscriber->poll_query_until('postgres', $twophase_query)
|
||||||
|
@ -55,14 +55,8 @@ $node_subscriber->safe_psql(
|
|||||||
PUBLICATION tap_pub
|
PUBLICATION tap_pub
|
||||||
WITH (streaming = on, two_phase = on)");
|
WITH (streaming = on, two_phase = on)");
|
||||||
|
|
||||||
# Wait for subscriber to finish initialization
|
# Wait for initial table sync to finish
|
||||||
$node_publisher->wait_for_catchup($appname);
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
|
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Also wait for two-phase to be enabled
|
# Also wait for two-phase to be enabled
|
||||||
my $twophase_query =
|
my $twophase_query =
|
||||||
|
@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# Wait for initial table sync to finish
|
# Wait for initial table sync to finish
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
|
||||||
|
|
||||||
# Check the initial data of tab_1 is copied to subscriber
|
# Check the initial data of tab_1 is copied to subscriber
|
||||||
my $result = $node_subscriber->safe_psql('postgres',
|
my $result = $node_subscriber->safe_psql('postgres',
|
||||||
@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
|
"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
|
||||||
|
|
||||||
# Wait for initial table sync to finish
|
# Wait for initial table sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
|
||||||
|
|
||||||
# Check the initial data of tab_drop_refresh was copied to subscriber
|
# Check the initial data of tab_drop_refresh was copied to subscriber
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
|
"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
|
||||||
|
|
||||||
# Wait for initial table sync to finish
|
# Wait for initial table sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
|
||||||
|
|
||||||
# Check the initial data of tab_1 was copied to subscriber again
|
# Check the initial data of tab_1 was copied to subscriber again
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
|
@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
|
"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub_schema');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Check the schema table data is synced up
|
# Check the schema table data is synced up
|
||||||
my $result = $node_subscriber->safe_psql('postgres',
|
my $result = $node_subscriber->safe_psql('postgres',
|
||||||
@ -123,8 +118,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
||||||
|
|
||||||
# Wait for sync to finish
|
# Wait for sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
|
$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
|
||||||
|
|
||||||
@ -158,8 +152,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
||||||
|
|
||||||
# Wait for sync to finish
|
# Wait for sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
|
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
|
||||||
@ -183,8 +176,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
|
||||||
|
|
||||||
# Wait for sync to finish
|
# Wait for sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
|
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
|
||||||
|
@ -153,13 +153,8 @@ SET SESSION AUTHORIZATION regress_admin;
|
|||||||
CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
|
CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
|
||||||
));
|
));
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('admin_sub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Verify that "regress_admin" can replicate into the tables
|
# Verify that "regress_admin" can replicate into the tables
|
||||||
#
|
#
|
||||||
|
@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
|
|||||||
$node_subscriber->init(allows_streaming => 'logical');
|
$node_subscriber->init(allows_streaming => 'logical');
|
||||||
$node_subscriber->start;
|
$node_subscriber->start;
|
||||||
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
|
|
||||||
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||||
my $appname = 'tap_sub';
|
my $appname = 'tap_sub';
|
||||||
|
|
||||||
@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
|
||||||
# wait for initial table synchronization to finish
|
# wait for initial table synchronization to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# The subscription of the FOR ALL TABLES publication means there should be no
|
# The subscription of the FOR ALL TABLES publication means there should be no
|
||||||
# filtering on the tablesync COPY, so all expect all 5 will be present.
|
# filtering on the tablesync COPY, so all expect all 5 will be present.
|
||||||
@ -133,10 +128,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
|
||||||
# wait for initial table synchronization to finish
|
# wait for initial table synchronization to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# The subscription of the ALL TABLES IN SCHEMA publication means there should be
|
# The subscription of the ALL TABLES IN SCHEMA publication means there should be
|
||||||
# no filtering on the tablesync COPY, so expect all 5 will be present.
|
# no filtering on the tablesync COPY, so expect all 5 will be present.
|
||||||
@ -397,11 +390,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup($appname);
|
|
||||||
|
|
||||||
# wait for initial table synchronization to finish
|
# wait for initial table synchronization to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Check expected replicated rows for tab_rowfilter_1
|
# Check expected replicated rows for tab_rowfilter_1
|
||||||
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
|
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
|
||||||
@ -622,8 +612,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
|
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
|
||||||
|
|
||||||
# wait for table synchronization to finish
|
# wait for table synchronization to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres',
|
$node_publisher->safe_psql('postgres',
|
||||||
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
|
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
|
||||||
|
@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
|
|||||||
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
|
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
|
||||||
|
|
||||||
# Wait for the data to replicate.
|
# Wait for the data to replicate.
|
||||||
$node_publisher->wait_for_catchup('sub');
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
|
||||||
$node_subscriber->poll_query_until('postgres',
|
|
||||||
"SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
|
|
||||||
);
|
|
||||||
|
|
||||||
# Confirm that we have finished the table sync.
|
# Confirm that we have finished the table sync.
|
||||||
my $result =
|
my $result =
|
||||||
|
@ -51,17 +51,9 @@ $node_A->safe_psql(
|
|||||||
PUBLICATION tap_pub_B
|
PUBLICATION tap_pub_B
|
||||||
WITH (origin = none, copy_data = off)");
|
WITH (origin = none, copy_data = off)");
|
||||||
|
|
||||||
# Wait for subscribers to finish initialization
|
# Wait for initial table sync to finish
|
||||||
$node_A->wait_for_catchup($appname_B1);
|
$node_A->wait_for_subscription_sync($node_B, $appname_A);
|
||||||
$node_B->wait_for_catchup($appname_A);
|
$node_B->wait_for_subscription_sync($node_A, $appname_B1);
|
||||||
|
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_A->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
$node_B->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
is(1, 1, 'Bidirectional replication setup is complete');
|
is(1, 1, 'Bidirectional replication setup is complete');
|
||||||
|
|
||||||
@ -126,10 +118,7 @@ $node_B->safe_psql(
|
|||||||
PUBLICATION tap_pub_C
|
PUBLICATION tap_pub_C
|
||||||
WITH (origin = none)");
|
WITH (origin = none)");
|
||||||
|
|
||||||
$node_C->wait_for_catchup($appname_B2);
|
$node_B->wait_for_subscription_sync($node_C, $appname_B2);
|
||||||
|
|
||||||
$node_B->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# insert a record
|
# insert a record
|
||||||
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
|
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
|
||||||
|
@ -22,18 +22,6 @@ $node_subscriber->start;
|
|||||||
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||||
my $offset = 0;
|
my $offset = 0;
|
||||||
|
|
||||||
sub wait_for_subscription_sync
|
|
||||||
{
|
|
||||||
my ($node) = @_;
|
|
||||||
|
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
|
|
||||||
$node->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
}
|
|
||||||
|
|
||||||
# setup tables on both nodes
|
# setup tables on both nodes
|
||||||
|
|
||||||
# tab1: simple 1:1 replication
|
# tab1: simple 1:1 replication
|
||||||
@ -160,7 +148,7 @@ $node_subscriber->safe_psql(
|
|||||||
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
|
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
# tab1: only (a,b) is replicated
|
# tab1: only (a,b) is replicated
|
||||||
$result =
|
$result =
|
||||||
@ -333,7 +321,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
|
|
||||||
# wait for the tablesync to complete, add a bit more data and then check
|
# wait for the tablesync to complete, add a bit more data and then check
|
||||||
# the results of the replication
|
# the results of the replication
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -385,9 +373,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('sub1');
|
|
||||||
|
|
||||||
# insert data and make sure the columns in column list get fully replicated
|
# insert data and make sure the columns in column list get fully replicated
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
@ -428,7 +414,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -465,7 +451,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
|
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -504,7 +490,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -621,7 +607,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -687,7 +673,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -758,7 +744,7 @@ $node_subscriber->safe_psql(
|
|||||||
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
|
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -795,7 +781,7 @@ $node_subscriber->safe_psql(
|
|||||||
TRUNCATE test_part_c;
|
TRUNCATE test_part_c;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -855,7 +841,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -898,7 +884,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
|
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -938,7 +924,7 @@ $node_subscriber->safe_psql(
|
|||||||
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
|
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -985,7 +971,7 @@ $node_subscriber->safe_psql(
|
|||||||
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
|
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -1034,7 +1020,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -1058,7 +1044,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -1102,7 +1088,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
@ -1150,7 +1136,7 @@ $node_subscriber->safe_psql(
|
|||||||
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
|
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
|
||||||
));
|
));
|
||||||
|
|
||||||
wait_for_subscription_sync($node_subscriber);
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
|
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
'postgres', qq(
|
'postgres', qq(
|
||||||
|
@ -144,12 +144,7 @@ $node_twoways->safe_psql('d2',
|
|||||||
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
|
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
|
||||||
# when tablesync workers might still be running. So in addition to that,
|
# when tablesync workers might still be running. So in addition to that,
|
||||||
# verify that tables are synced.
|
# verify that tables are synced.
|
||||||
# XXX maybe this should be integrated in wait_for_catchup() itself.
|
$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
|
||||||
$node_twoways->wait_for_catchup('testsub');
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_twoways->poll_query_until('d2', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
|
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
|
||||||
$rows * 2, "2x$rows rows in t");
|
$rows * 2, "2x$rows rows in t");
|
||||||
@ -278,11 +273,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
is( $node_subscriber->safe_psql(
|
is( $node_subscriber->safe_psql(
|
||||||
'postgres', "SELECT * FROM tab_replidentity_index"),
|
'postgres', "SELECT * FROM tab_replidentity_index"),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user