diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 5c7e9d11ac8..1fc34b18a44 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel, } } +/* + * Check if replica identity matches and mark the updatable flag. + * + * We allow for stricter replica identity (fewer columns) on subscriber as + * that will not stop us from finding unique tuple. IE, if publisher has + * identity (id,timestamp) and subscriber just (id) this will not be a + * problem, but in the opposite scenario it will. + * + * We just mark the relation entry as not updatable here if the local + * replica identity is found to be insufficient for applying + * updates/deletes (inserts don't care!) and leave it to + * check_relation_updatable() to throw the actual error if needed. + */ +static void +logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) +{ + Bitmapset *idkey; + LogicalRepRelation *remoterel = &entry->remoterel; + int i; + + entry->updatable = true; + + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + /* fallback to PK if no replica identity */ + if (idkey == NULL) + { + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + /* + * If no replica identity index and no PK, the published table must + * have replica identity FULL. + */ + if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) + entry->updatable = false; + } + + i = -1; + while ((i = bms_next_member(idkey, i)) >= 0) + { + int attnum = i + FirstLowInvalidHeapAttributeNumber; + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" uses " + "system columns in REPLICA IDENTITY index", + remoterel->nspname, remoterel->relname))); + + attnum = AttrNumberGetAttrOffset(attnum); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) + { + entry->updatable = false; + break; + } + } +} + /* * Open the local relation associated with the remote one. * @@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) if (!entry->localrelvalid) { Oid relid; - Bitmapset *idkey; TupleDesc desc; MemoryContext oldctx; int i; @@ -366,54 +426,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) bms_free(missingatts); /* - * Check that replica identity matches. We allow for stricter replica - * identity (fewer columns) on subscriber as that will not stop us - * from finding unique tuple. IE, if publisher has identity - * (id,timestamp) and subscriber just (id) this will not be a problem, - * but in the opposite scenario it will. - * - * Don't throw any error here just mark the relation entry as not - * updatable, as replica identity is only for updates and deletes but - * inserts can be replicated even without it. + * Set if the table's replica identity is enough to apply + * update/delete. */ - entry->updatable = true; - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_IDENTITY_KEY); - /* fallback to PK if no replica identity */ - if (idkey == NULL) - { - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - - /* - * If no replica identity index and no PK, the published table - * must have replica identity FULL. - */ - if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) - entry->updatable = false; - } - - i = -1; - while ((i = bms_next_member(idkey, i)) >= 0) - { - int attnum = i + FirstLowInvalidHeapAttributeNumber; - - if (!AttrNumberIsForUserDefinedAttr(attnum)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication target relation \"%s.%s\" uses " - "system columns in REPLICA IDENTITY index", - remoterel->nspname, remoterel->relname))); - - attnum = AttrNumberGetAttrOffset(attnum); - - if (entry->attrmap->attnums[attnum] < 0 || - !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) - { - entry->updatable = false; - break; - } - } + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; } @@ -651,7 +667,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, attrmap->maplen * sizeof(AttrNumber)); } - entry->updatable = root->updatable; + /* Set if the table's replica identity is enough to apply update/delete. */ + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bf97fa44ba2..8c9a4b50383 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1323,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata, static void check_relation_updatable(LogicalRepRelMapEntry *rel) { + /* + * For partitioned tables, we only need to care if the target partition is + * updatable (aka has PK or RI defined for it). + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + /* Updatable, no error. */ if (rel->updatable) return; @@ -1676,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot_part; TupleConversionMap *map; MemoryContext oldctx; + LogicalRepRelMapEntry *part_entry = NULL; + AttrMap *attrmap = NULL; /* ModifyTableState is needed for ExecFindPartition(). */ edata->mtstate = mtstate = makeNode(ModifyTableState); @@ -1707,8 +1716,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); map = partrelinfo->ri_RootToPartitionMap; if (map != NULL) - remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, + { + attrmap = map->attrMap; + remoteslot_part = execute_attr_map_slot(attrmap, remoteslot, remoteslot_part); + } else { remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); @@ -1716,6 +1728,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + /* Check if we can do the update or delete on the leaf partition. */ + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + check_relation_updatable(part_entry); + } + switch (operation) { case CMD_INSERT: @@ -1737,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * suitable partition. */ { - AttrMap *attrmap = map ? map->attrMap : NULL; - LogicalRepRelMapEntry *part_entry; TupleTableSlot *localslot; ResultRelInfo *partrelinfo_new; bool found; - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); - /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(estate, partrel, &part_entry->remoterel, diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 568e4d104e0..dfe2cb6deae 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 70; +use Test::More tests => 71; # setup @@ -856,3 +856,17 @@ $node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber2->safe_psql('postgres', "SELECT a, b, c FROM tab5 ORDER BY 1"); is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher'); + +# Test that replication works correctly as long as the leaf partition +# has the necessary REPLICA IDENTITY, even though the actual target +# partitioned table does not. +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab5 REPLICA IDENTITY NOTHING"); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5_1 ORDER BY 1"); +is($result, qq(4||1), 'updates of tab5 replicated correctly');