diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e25530e7b88..79735da21a9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -701,6 +701,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) List *pubids = GetRelationPublications(relid); ListCell *lc; Oid publish_as_relid = relid; + int publish_ancestor_level = 0; bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); @@ -729,11 +730,28 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Publication *pub = lfirst(lc); bool publish = false; + /* + * Under what relid should we publish changes in this publication? + * We'll use the top-most relid across all publications. Also track + * the ancestor level for this publication. + */ + Oid pub_relid = relid; + int ancestor_level = 0; + + /* + * If this is a FOR ALL TABLES publication, pick the partition root + * and set the ancestor level accordingly. + */ if (pub->alltables) { publish = true; if (pub->pubviaroot && am_partition) - publish_as_relid = llast_oid(get_partition_ancestors(relid)); + { + List *ancestors = get_partition_ancestors(relid); + + pub_relid = llast_oid(ancestors); + ancestor_level = list_length(ancestors); + } } if (!publish) @@ -750,6 +768,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { List *ancestors = get_partition_ancestors(relid); ListCell *lc2; + int level = 0; /* * Find the "topmost" ancestor that is in this @@ -759,12 +778,17 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Oid ancestor = lfirst_oid(lc2); + level++; + if (list_member_oid(GetRelationPublications(ancestor), pub->oid)) { ancestor_published = true; if (pub->pubviaroot) - publish_as_relid = ancestor; + { + pub_relid = ancestor; + ancestor_level = level; + } } } } @@ -785,11 +809,21 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; - } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + /* + * We want to publish the changes as the top-most ancestor + * across all publications. So we need to check if the + * already calculated level is higher than the new one. If + * yes, we can ignore the new value (as it's a child). + * Otherwise the new value is an ancestor, so we keep it. + */ + if (publish_ancestor_level > ancestor_level) + continue; + + /* The new value is an ancestor, so let's keep it. */ + publish_as_relid = pub_relid; + publish_ancestor_level = ancestor_level; + } } list_free(pubids); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 6d77001469b..7689cbb3643 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 63; +use Test::More tests => 67; # setup @@ -406,6 +406,12 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)"); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); # Note: tab3_1's parent is not in the publication, in which case its @@ -415,6 +421,9 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)" ); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)" +); # prepare data for the initial sync $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)"); @@ -459,10 +468,16 @@ $node_subscriber2->safe_psql('postgres', $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)" ); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab4 (a int PRIMARY KEY)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab4_1 (a int PRIMARY KEY)" +); # Publication that sub2 points to now publishes via root, so must update # subscription target relations. $node_subscriber2->safe_psql('postgres', - "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); # Wait for initial sync of all subscriptions $node_subscriber1->poll_query_until('postgres', $synced_query) @@ -483,6 +498,8 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (0), (3), (5)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (0), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (0)"); $node_publisher->wait_for_catchup('sub_viaroot'); $node_publisher->wait_for_catchup('sub2'); @@ -522,6 +539,43 @@ sub2_tab3|1 sub2_tab3|3 sub2_tab3|5), 'inserts into tab3 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + +# now switch the order of publications in the list, try again, the result +# should be the same (no dependence on order of pulications) +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_all, pub_lower_level"); + +# make sure the subscription on the second subscriber is synced, before +# continuing +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert a change into the leaf partition, should be replicated through +# the partition root (thanks to the FOR ALL TABLES partition). +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (1)"); + +$node_publisher->wait_for_catchup('sub2'); + +# tab4 change should be replicated through the root partition, which +# maps to the tab4 relation on subscriber. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0 +1), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + + # update (replicated as update) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");