diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 25998fbb39b..789b895db89 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -277,16 +277,21 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, /* * Returns the relid of the topmost ancestor that is published via this - * publication if any, otherwise returns InvalidOid. + * publication if any and set its ancestor level to ancestor_level, + * otherwise returns InvalidOid. + * + * The ancestor_level value allows us to compare the results for multiple + * publications, and decide which value is higher up. * * Note that the list of ancestors should be ordered such that the topmost * ancestor is at the end of the list. */ Oid -GetTopMostAncestorInPublication(Oid puboid, List *ancestors) +GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level) { ListCell *lc; Oid topmost_relid = InvalidOid; + int level = 0; /* * Find the "topmost" ancestor that is in this publication. @@ -297,13 +302,25 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors) List *apubids = GetRelationPublications(ancestor); List *aschemaPubids = NIL; + level++; + if (list_member_oid(apubids, puboid)) + { topmost_relid = ancestor; + + if (ancestor_level) + *ancestor_level = level; + } else { aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); if (list_member_oid(aschemaPubids, puboid)) + { topmost_relid = ancestor; + + if (ancestor_level) + *ancestor_level = level; + } } list_free(apubids); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 16b8661a1b7..1aad2e769cb 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -323,7 +323,8 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, */ if (pubviaroot && relation->rd_rel->relispartition) { - publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors); + publish_as_relid + = GetTopMostAncestorInPublication(pubid, ancestors, NULL); if (!OidIsValid(publish_as_relid)) publish_as_relid = relid; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ea57a0477f0..d869f3e93eb 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1748,6 +1748,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; + int publish_ancestor_level = 0; bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; @@ -1815,11 +1816,28 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) Publication *pub = lfirst(lc); bool publish = false; + /* + * Under what relid should we publish changes in this publication? + * We'll use the top-most relid across all publications. Also track + * the ancestor level for this publication. + */ + Oid pub_relid = relid; + int ancestor_level = 0; + + /* + * If this is a FOR ALL TABLES publication, pick the partition root + * and set the ancestor level accordingly. + */ if (pub->alltables) { publish = true; if (pub->pubviaroot && am_partition) - publish_as_relid = llast_oid(get_partition_ancestors(relid)); + { + List *ancestors = get_partition_ancestors(relid); + + pub_relid = llast_oid(ancestors); + ancestor_level = list_length(ancestors); + } } if (!publish) @@ -1835,16 +1853,21 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (am_partition) { Oid ancestor; + int level; List *ancestors = get_partition_ancestors(relid); ancestor = GetTopMostAncestorInPublication(pub->oid, - ancestors); + ancestors, + &level); if (ancestor != InvalidOid) { ancestor_published = true; if (pub->pubviaroot) - publish_as_relid = ancestor; + { + pub_relid = ancestor; + ancestor_level = level; + } } } @@ -1868,6 +1891,20 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; rel_publications = lappend(rel_publications, pub); + + /* + * We want to publish the changes as the top-most ancestor + * across all publications. So we need to check if the + * already calculated level is higher than the new one. If + * yes, we can ignore the new value (as it's a child). + * Otherwise the new value is an ancestor, so we keep it. + */ + if (publish_ancestor_level > ancestor_level) + continue; + + /* The new value is an ancestor, so let's keep it. */ + publish_as_relid = pub_relid; + publish_ancestor_level = ancestor_level; } } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index ba72e62e614..fe773cf9b7d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -134,7 +134,8 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid, extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); -extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors); +extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, + int *ancestor_level); extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 5266471a7ae..66e63e755ef 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -409,6 +409,14 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)"); + $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); # Note: tab3_1's parent is not in the publication, in which case its @@ -419,6 +427,11 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)" ); +# for tab4, we publish changes through the "middle" partitioned table +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)" +); + # prepare data for the initial sync $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)"); @@ -462,10 +475,20 @@ $node_subscriber2->safe_psql('postgres', $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)" ); -# Publication that sub2 points to now publishes via root, so must update -# subscription target relations. + +# Note: We create two separate tables, not a partitioned one, so that we can +# easily identity through which relation were the changes replicated. $node_subscriber2->safe_psql('postgres', - "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + "CREATE TABLE tab4 (a int PRIMARY KEY)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab4_1 (a int PRIMARY KEY)" +); +# Publication that sub2 points to now publishes via root, so must update +# subscription target relations. We set the list of publications so that +# the FOR ALL TABLES publication is second (the list order matters). +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); # Wait for initial sync of all subscriptions $node_subscriber1->poll_query_until('postgres', $synced_query) @@ -487,6 +510,11 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (0), (3), (5)"); +# Insert a row into the leaf partition, should be replicated through the +# partition root (thanks to the FOR ALL TABLES partition). +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (0)"); + $node_publisher->wait_for_catchup('sub_viaroot'); $node_publisher->wait_for_catchup('sub2'); @@ -525,6 +553,46 @@ sub2_tab3|1 sub2_tab3|3 sub2_tab3|5), 'inserts into tab3 replicated'); +# tab4 change should be replicated through the root partition, which +# maps to the tab4 relation on subscriber. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + + +# now switch the order of publications in the list, try again, the result +# should be the same (no dependence on order of pulications) +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_all, pub_lower_level"); + +# make sure the subscription on the second subscriber is synced, before +# continuing +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert a change into the leaf partition, should be replicated through +# the partition root (thanks to the FOR ALL TABLES partition). +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (1)"); + +$node_publisher->wait_for_catchup('sub2'); + +# tab4 change should be replicated through the root partition, which +# maps to the tab4 relation on subscriber. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0 +1), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + + # update (replicated as update) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");