diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9ab954a6e01..c3126545b48 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -21,6 +21,7 @@ #include "utils/inval.h" #include "utils/int8.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -509,6 +510,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); + /* + * Skip tables that look like they are from a heap rewrite (see + * make_new_heap()). We need to skip them because the subscriber + * won't have a table by that name to receive the data. That + * means we won't ship the new data in, say, an added column with + * a DEFAULT, but if the user applies the same DDL manually on the + * subscriber, then this will work out for them. + * + * We only need to consider the alltables case, because such a + * transient heap won't be an explicit member of a publication. + */ + if (pub->alltables) + { + char *relname = get_rel_name(relid); + unsigned int u; + int n; + + if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 && + relname[n] == '\0') + { + if (get_rel_relkind(u) == RELKIND_RELATION) + break; + } + } + if (pub->alltables || list_member_oid(pubids, pub->oid)) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl new file mode 100644 index 00000000000..5e3211aefaf --- /dev/null +++ b/src/test/subscription/t/006_rewrite.pl @@ -0,0 +1,73 @@ +# Test logical replication behavior with heap rewrites +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'encoding_test'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;" +); + +wait_for_caught_up($node_publisher, $appname); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); + +wait_for_caught_up($node_publisher, $appname); + +is($node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}), + qq(1|one +2|two), + 'initial data replicated to subscriber'); + +# DDL that causes a heap rewrite +my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;"; +$node_subscriber->safe_psql('postgres', $ddl2); +$node_publisher->safe_psql('postgres', $ddl2); + +wait_for_caught_up($node_publisher, $appname); + +$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);}); + +wait_for_caught_up($node_publisher, $appname); + +is($node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}), + qq(1|one|0 +2|two|0 +3|three|33), + 'data replicated to subscriber'); + +$node_subscriber->stop; +$node_publisher->stop;