mirror of
https://github.com/postgres/postgres.git
synced 2025-04-27 22:56:53 +03:00
Back-Patch "Add wait_for_subscription_sync for TAP tests."
This was originally done in commit 0c20dd33db for 16 only, to eliminate duplicate code and as an infrastructure that makes it easier to write future tests. However, it has been suggested that it would be good to back-patch this testing infrastructure to aid future tests in back-branches. Backpatch to all supported versions. Author: Masahiko Sawada Reviewed by: Amit Kapila, Shi yu Discussion: https://postgr.es/m/CAD21AoC-fvAkaKHa4t1urupwL8xbAcWRePeETvshvy80f6WV1A@mail.gmail.com Discussion: https://postgr.es/m/E1oJBIf-0006sw-SA@gemulon.postgresql.org
This commit is contained in:
parent
547b963683
commit
5afa63f0ae
@ -2195,6 +2195,50 @@ sub wait_for_slot_catchup
|
|||||||
|
|
||||||
=pod
|
=pod
|
||||||
|
|
||||||
|
=item $node->wait_for_subscription_sync(publisher, subname, dbname)
|
||||||
|
|
||||||
|
Wait for all tables in pg_subscription_rel to complete the initial
|
||||||
|
synchronization (i.e to be either in 'syncdone' or 'ready' state).
|
||||||
|
|
||||||
|
If the publisher node is given, additionally, check if the subscriber has
|
||||||
|
caught up to what has been committed on the primary. This is useful to
|
||||||
|
ensure that the initial data synchronization has been completed after
|
||||||
|
creating a new subscription.
|
||||||
|
|
||||||
|
If there is no active replication connection from this peer, wait until
|
||||||
|
poll_query_until timeout.
|
||||||
|
|
||||||
|
This is not a test. It die()s on failure.
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub wait_for_subscription_sync
|
||||||
|
{
|
||||||
|
my ($self, $publisher, $subname, $dbname) = @_;
|
||||||
|
my $name = $self->name;
|
||||||
|
|
||||||
|
$dbname = defined($dbname) ? $dbname : 'postgres';
|
||||||
|
|
||||||
|
# Wait for all tables to finish initial sync.
|
||||||
|
print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
|
||||||
|
my $query =
|
||||||
|
qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');];
|
||||||
|
$self->poll_query_until($dbname, $query)
|
||||||
|
or croak "timed out waiting for subscriber to synchronize data";
|
||||||
|
|
||||||
|
# Then, wait for the replication to catchup if required.
|
||||||
|
if (defined($publisher))
|
||||||
|
{
|
||||||
|
croak 'subscription name must be specified' unless defined($subname);
|
||||||
|
$publisher->wait_for_catchup($subname);
|
||||||
|
}
|
||||||
|
|
||||||
|
print "done\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
=pod
|
||||||
|
|
||||||
=item $node->wait_for_log(regexp, offset)
|
=item $node->wait_for_log(regexp, offset)
|
||||||
|
|
||||||
Waits for the contents of the server log file, starting at the given offset, to
|
Waits for the contents of the server log file, starting at the given offset, to
|
||||||
|
@ -85,13 +85,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
|
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
|
||||||
|
@ -111,13 +111,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Insert initial test data
|
# Insert initial test data
|
||||||
$node_publisher->safe_psql(
|
$node_publisher->safe_psql(
|
||||||
|
@ -36,13 +36,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
|
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
|
||||||
@ -68,8 +63,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
|
|||||||
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
|
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
|
||||||
|
|
||||||
# wait for sync to finish this time
|
# wait for sync to finish this time
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that all data is synced
|
# check that all data is synced
|
||||||
$result =
|
$result =
|
||||||
@ -104,8 +98,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# and wait for data sync to finish again
|
# and wait for data sync to finish again
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that all data is synced
|
# check that all data is synced
|
||||||
$result =
|
$result =
|
||||||
@ -130,8 +123,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
||||||
|
|
||||||
# wait for sync to finish
|
# wait for sync to finish
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$result = $node_subscriber->safe_psql('postgres',
|
$result = $node_subscriber->safe_psql('postgres',
|
||||||
"SELECT count(*) FROM tab_rep_next");
|
"SELECT count(*) FROM tab_rep_next");
|
||||||
|
@ -29,13 +29,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('mysub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres',
|
$node_publisher->safe_psql('postgres',
|
||||||
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
|
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
|
||||||
|
@ -25,13 +25,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('mysub');
|
# Wait for initial sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
|
||||||
# Wait for initial sync to finish as well
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
$node_publisher->safe_psql('postgres',
|
$node_publisher->safe_psql('postgres',
|
||||||
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
|
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
|
||||||
|
@ -35,13 +35,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result =
|
my $result =
|
||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
@ -102,8 +97,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
|
|||||||
$node_subscriber->safe_psql('postgres',
|
$node_subscriber->safe_psql('postgres',
|
||||||
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
|
||||||
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Add replica identity column. (The serial is not necessary, but it's
|
# Add replica identity column. (The serial is not necessary, but it's
|
||||||
# a convenient way to get a default on the new column so that rows
|
# a convenient way to get a default on the new column so that rows
|
||||||
|
@ -64,10 +64,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# insert data to truncate
|
# insert data to truncate
|
||||||
|
|
||||||
@ -180,8 +177,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# wait for initial data sync
|
# wait for initial data sync
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# insert data to truncate
|
# insert data to truncate
|
||||||
|
|
||||||
|
@ -37,10 +37,7 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
);
|
);
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
|
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
|
||||||
is( $result, qq(1|22
|
is( $result, qq(1|22
|
||||||
|
@ -150,12 +150,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
|
|||||||
});
|
});
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
my $synced_query =
|
$node_subscriber1->wait_for_subscription_sync;
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
$node_subscriber1->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Tests for replication using leaf partition identity and schema
|
# Tests for replication using leaf partition identity and schema
|
||||||
|
|
||||||
@ -480,10 +476,8 @@ $node_subscriber2->safe_psql('postgres',
|
|||||||
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
|
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
|
||||||
|
|
||||||
# Wait for initial sync of all subscriptions
|
# Wait for initial sync of all subscriptions
|
||||||
$node_subscriber1->poll_query_until('postgres', $synced_query)
|
$node_subscriber1->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# check that data is synced correctly
|
# check that data is synced correctly
|
||||||
$result = $node_subscriber1->safe_psql('postgres',
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
@ -554,8 +548,7 @@ $node_subscriber2->safe_psql('postgres',
|
|||||||
|
|
||||||
# make sure the subscription on the second subscriber is synced, before
|
# make sure the subscription on the second subscriber is synced, before
|
||||||
# continuing
|
# continuing
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Insert a change into the leaf partition, should be replicated through
|
# Insert a change into the leaf partition, should be replicated through
|
||||||
# the partition root (thanks to the FOR ALL TABLES partition).
|
# the partition root (thanks to the FOR ALL TABLES partition).
|
||||||
@ -810,8 +803,7 @@ $node_subscriber2->safe_psql(
|
|||||||
$node_subscriber2->safe_psql('postgres',
|
$node_subscriber2->safe_psql('postgres',
|
||||||
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
|
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
|
||||||
|
|
||||||
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
$node_subscriber2->wait_for_subscription_sync;
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
# Make partition map cache
|
# Make partition map cache
|
||||||
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
|
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
|
||||||
|
@ -142,12 +142,7 @@ $node_twoways->safe_psql(
|
|||||||
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
|
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
|
||||||
# when tablesync workers might still be running. So in addition to that,
|
# when tablesync workers might still be running. So in addition to that,
|
||||||
# verify that tables are synced.
|
# verify that tables are synced.
|
||||||
# XXX maybe this should be integrated in wait_for_catchup() itself.
|
$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
|
||||||
$node_twoways->wait_for_catchup('testsub');
|
|
||||||
my $synced_query =
|
|
||||||
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
|
||||||
$node_twoways->poll_query_until('d2', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
|
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
|
||||||
$rows * 2, "2x$rows rows in t");
|
$rows * 2, "2x$rows rows in t");
|
||||||
@ -207,11 +202,8 @@ $node_subscriber->safe_psql('postgres',
|
|||||||
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||||
);
|
);
|
||||||
|
|
||||||
$node_publisher->wait_for_catchup('tap_sub');
|
# Wait for initial table sync to finish
|
||||||
|
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||||
# Also wait for initial table sync to finish
|
|
||||||
$node_subscriber->poll_query_until('postgres', $synced_query)
|
|
||||||
or die "Timed out while waiting for subscriber to synchronize data";
|
|
||||||
|
|
||||||
is( $node_subscriber->safe_psql(
|
is( $node_subscriber->safe_psql(
|
||||||
'postgres', "SELECT * FROM tab_replidentity_index"),
|
'postgres', "SELECT * FROM tab_replidentity_index"),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user