diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 8bd7c9c8ac0..c513621470a 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -402,18 +402,21 @@
- Replication is only supported by tables, partitioned or not, although a
- given table must either be partitioned on both servers or not partitioned
- at all. Also, when replicating between partitioned tables, the actual
- replication occurs between leaf partitions, so partitions on the two
- servers must match one-to-one.
-
-
-
+ Replication is only supported by tables, including partitioned tables.
Attempts to replicate other types of relations such as views, materialized
views, or foreign tables, will result in an error.
+
+
+
+ When replicating between partitioned tables, the actual replication
+ originates from the leaf partitions on the publisher, so partitions on
+ the publisher must also exist on the subscriber as valid target tables.
+ (They could either be leaf partitions themselves, or they could be
+ further subpartitioned, or they could even be independent tables.)
+
+
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 7194becfd99..dc8a01a5cd5 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -594,17 +594,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
/*
- * We currently only support writing to regular tables. However, give a
- * more specific error for partitioned and foreign tables.
+ * Give a more specific error for foreign tables.
*/
- if (relkind == RELKIND_PARTITIONED_TABLE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot use relation \"%s.%s\" as logical replication target",
- nspname, relname),
- errdetail("\"%s.%s\" is a partitioned table.",
- nspname, relname)));
- else if (relkind == RELKIND_FOREIGN_TABLE)
+ if (relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
@@ -612,7 +604,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
errdetail("\"%s.%s\" is a foreign table.",
nspname, relname)));
- if (relkind != RELKIND_RELATION)
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 3d7291b9703..2e7b755aebb 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
+/*
+ * Partition map (LogicalRepPartMap)
+ *
+ * When a partitioned table is used as replication target, replicated
+ * operations are actually performed on its leaf partitions, which requires
+ * the partitions to also be mapped to the remote relation. Parent's entry
+ * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
+ * individual partitions may have different attribute numbers, which means
+ * attribute mappings to remote relation's attributes must be maintained
+ * separately for each partition.
+ */
+static MemoryContext LogicalRepPartMapContext = NULL;
+static HTAB *LogicalRepPartMap = NULL;
+typedef struct LogicalRepPartMapEntry
+{
+ Oid partoid; /* LogicalRepPartMap's key */
+ LogicalRepRelMapEntry relmapentry;
+} LogicalRepPartMapEntry;
/*
* Relcache invalidation callback for our relation map cache.
@@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid)
Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
}
+
+/*
+ * Partition cache: look up partition LogicalRepRelMapEntry's
+ *
+ * Unlike relation map cache, this is keyed by partition OID, not remote
+ * relation OID, because we only have to use this cache in the case where
+ * partitions are not directly mapped to any remote relation, such as when
+ * replication is occurring with one of their ancestors as target.
+ */
+
+/*
+ * Relcache invalidation callback
+ */
+static void
+logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
+{
+ LogicalRepRelMapEntry *entry;
+
+ /* Just to be sure. */
+ if (LogicalRepPartMap == NULL)
+ return;
+
+ if (reloid != InvalidOid)
+ {
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepPartMap);
+
+ /* TODO, use inverse lookup hashtable? */
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (entry->localreloid == reloid)
+ {
+ entry->localreloid = InvalidOid;
+ hash_seq_term(&status);
+ break;
+ }
+ }
+ }
+ else
+ {
+ /* invalidate all cache entries */
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepPartMap);
+
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ entry->localreloid = InvalidOid;
+ }
+}
+
+/*
+ * Initialize the partition map cache.
+ */
+static void
+logicalrep_partmap_init(void)
+{
+ HASHCTL ctl;
+
+ if (!LogicalRepPartMapContext)
+ LogicalRepPartMapContext =
+ AllocSetContextCreate(CacheMemoryContext,
+ "LogicalRepPartMapContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* Initialize the relation hash table. */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid); /* partition OID */
+ ctl.entrysize = sizeof(LogicalRepPartMapEntry);
+ ctl.hcxt = LogicalRepPartMapContext;
+
+ LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ /* Watch for invalidation events. */
+ CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
+ (Datum) 0);
+}
+
+/*
+ * logicalrep_partition_open
+ *
+ * Returned entry reuses most of the values of the root table's entry, save
+ * the attribute map, which can be different for the partition.
+ *
+ * Note there's no logialrep_partition_close, because the caller closes the
+ * the component relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_partition_open(LogicalRepRelMapEntry *root,
+ Relation partrel, AttrMap *map)
+{
+ LogicalRepRelMapEntry *entry;
+ LogicalRepPartMapEntry *part_entry;
+ LogicalRepRelation *remoterel = &root->remoterel;
+ Oid partOid = RelationGetRelid(partrel);
+ AttrMap *attrmap = root->attrmap;
+ bool found;
+ int i;
+ MemoryContext oldctx;
+
+ if (LogicalRepPartMap == NULL)
+ logicalrep_partmap_init();
+
+ /* Search for existing entry. */
+ part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
+ (void *) &partOid,
+ HASH_ENTER, &found);
+
+ if (found)
+ return &part_entry->relmapentry;
+
+ memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+
+ /* Switch to longer-lived context. */
+ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
+
+ part_entry->partoid = partOid;
+
+ /* Remote relation is used as-is from the root entry. */
+ entry = &part_entry->relmapentry;
+ entry->remoterel.remoteid = remoterel->remoteid;
+ entry->remoterel.nspname = pstrdup(remoterel->nspname);
+ entry->remoterel.relname = pstrdup(remoterel->relname);
+ entry->remoterel.natts = remoterel->natts;
+ entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+ entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+ for (i = 0; i < remoterel->natts; i++)
+ {
+ entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+ entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+ }
+ entry->remoterel.replident = remoterel->replident;
+ entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+
+ entry->localrel = partrel;
+ entry->localreloid = partOid;
+
+ /*
+ * If the partition's attributes don't match the root relation's, we'll
+ * need to make a new attrmap which maps partition attribute numbers to
+ * remoterel's, instead the original which maps root relation's attribute
+ * numbers to remoterel's.
+ *
+ * Note that 'map' which comes from the tuple routing data structure
+ * contains 1-based attribute numbers (of the parent relation). However,
+ * the map in 'entry', a logical replication data structure, contains
+ * 0-based attribute numbers (of the remote relation).
+ */
+ if (map)
+ {
+ AttrNumber attno;
+
+ entry->attrmap = make_attrmap(map->maplen);
+ for (attno = 0; attno < entry->attrmap->maplen; attno++)
+ {
+ AttrNumber root_attno = map->attnums[attno];
+
+ entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+ }
+ }
+ else
+ entry->attrmap = attrmap;
+
+ entry->updatable = root->updatable;
+
+ /* state and statelsn are left set to 0. */
+ MemoryContextSwitchTo(oldctx);
+
+ return entry;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index a60c6661538..c27d9705895 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -762,7 +762,6 @@ copy_table(Relation rel)
/* Map the publisher relation to local one. */
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
- Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
/* Start copy on the publisher. */
initStringInfo(&cmd);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 673ebd211d1..a752a1224d6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -29,11 +29,14 @@
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
+#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
@@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
+static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
+ EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry,
+ CmdType operation);
/*
* Should this worker apply changes for given relation.
@@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_insert_internal(estate->es_result_relation_info, estate,
- remoteslot);
+ /* For a partitioned table, insert the tuple into a partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, NULL, rel, CMD_INSERT);
+ else
+ apply_handle_insert_internal(estate->es_result_relation_info, estate,
+ remoteslot);
PopActiveSnapshot();
@@ -767,9 +780,13 @@ apply_handle_update(StringInfo s)
has_oldtup ? oldtup.values : newtup.values);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_update_internal(estate->es_result_relation_info, estate,
- remoteslot, &newtup, rel);
+ /* For a partitioned table, apply update to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, &newtup, rel, CMD_UPDATE);
+ else
+ apply_handle_update_internal(estate->es_result_relation_info, estate,
+ remoteslot, &newtup, rel);
PopActiveSnapshot();
@@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s)
slot_store_cstrings(remoteslot, rel, oldtup.values);
MemoryContextSwitchTo(oldctx);
- Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
- apply_handle_delete_internal(estate->es_result_relation_info, estate,
- remoteslot, &rel->remoterel);
+ /* For a partitioned table, apply delete to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+ remoteslot, NULL, rel, CMD_DELETE);
+ else
+ apply_handle_delete_internal(estate->es_result_relation_info, estate,
+ remoteslot, &rel->remoterel);
PopActiveSnapshot();
@@ -975,6 +996,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
return found;
}
+/*
+ * This handles insert, update, delete on a partitioned table.
+ */
+static void
+apply_handle_tuple_routing(ResultRelInfo *relinfo,
+ EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry,
+ CmdType operation)
+{
+ Relation parentrel = relinfo->ri_RelationDesc;
+ ModifyTableState *mtstate = NULL;
+ PartitionTupleRouting *proute = NULL;
+ ResultRelInfo *partrelinfo;
+ Relation partrel;
+ TupleTableSlot *remoteslot_part;
+ PartitionRoutingInfo *partinfo;
+ TupleConversionMap *map;
+ MemoryContext oldctx;
+
+ /* ModifyTableState is needed for ExecFindPartition(). */
+ mtstate = makeNode(ModifyTableState);
+ mtstate->ps.plan = NULL;
+ mtstate->ps.state = estate;
+ mtstate->operation = operation;
+ mtstate->resultRelInfo = relinfo;
+ proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
+
+ /*
+ * Find the partition to which the "search tuple" belongs.
+ */
+ Assert(remoteslot != NULL);
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
+ remoteslot, estate);
+ Assert(partrelinfo != NULL);
+ partrel = partrelinfo->ri_RelationDesc;
+
+ /*
+ * To perform any of the operations below, the tuple must match the
+ * partition's rowtype. Convert if needed or just copy, using a dedicated
+ * slot to store the tuple in any case.
+ */
+ partinfo = partrelinfo->ri_PartitionInfo;
+ remoteslot_part = partinfo->pi_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
+ map = partinfo->pi_RootToPartitionMap;
+ if (map != NULL)
+ remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+ remoteslot_part);
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
+ slot_getallattrs(remoteslot_part);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ estate->es_result_relation_info = partrelinfo;
+ switch (operation)
+ {
+ case CMD_INSERT:
+ apply_handle_insert_internal(partrelinfo, estate,
+ remoteslot_part);
+ break;
+
+ case CMD_DELETE:
+ apply_handle_delete_internal(partrelinfo, estate,
+ remoteslot_part,
+ &relmapentry->remoterel);
+ break;
+
+ case CMD_UPDATE:
+
+ /*
+ * For UPDATE, depending on whether or not the updated tuple
+ * satisfies the partition's constraint, perform a simple UPDATE
+ * of the partition or move the updated tuple into a different
+ * suitable partition.
+ */
+ {
+ AttrMap *attrmap = map ? map->attrMap : NULL;
+ LogicalRepRelMapEntry *part_entry;
+ TupleTableSlot *localslot;
+ ResultRelInfo *partrelinfo_new;
+ bool found;
+
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+
+ /* Get the matching local tuple from the partition. */
+ found = FindReplTupleInLocalRel(estate, partrel,
+ &part_entry->remoterel,
+ remoteslot_part, &localslot);
+
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ if (found)
+ {
+ /* Apply the update. */
+ slot_modify_cstrings(remoteslot_part, localslot,
+ part_entry,
+ newtup->values, newtup->changed);
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ {
+ /*
+ * The tuple to be updated could not be found.
+ *
+ * TODO what to do here, change the log level to LOG
+ * perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row for update "
+ "in replication target relation \"%s\"",
+ RelationGetRelationName(partrel));
+ }
+
+ /*
+ * Does the updated tuple still satisfy the current
+ * partition's constraint?
+ */
+ if (partrelinfo->ri_PartitionCheck == NULL ||
+ ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
+ false))
+ {
+ /*
+ * Yes, so simply UPDATE the partition. We don't call
+ * apply_handle_update_internal() here, which would
+ * normally do the following work, to avoid repeating some
+ * work already done above to find the local tuple in the
+ * partition.
+ */
+ EPQState epqstate;
+
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ ExecOpenIndices(partrelinfo, false);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot_part);
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot,
+ remoteslot_part);
+ ExecCloseIndices(partrelinfo);
+ EvalPlanQualEnd(&epqstate);
+ }
+ else
+ {
+ /* Move the tuple into the new partition. */
+
+ /*
+ * New partition will be found using tuple routing, which
+ * can only occur via the parent table. We might need to
+ * convert the tuple to the parent's rowtype. Note that
+ * this is the tuple found in the partition, not the
+ * original search tuple received by this function.
+ */
+ if (map)
+ {
+ TupleConversionMap *PartitionToRootMap =
+ convert_tuples_by_name(RelationGetDescr(partrel),
+ RelationGetDescr(parentrel));
+
+ remoteslot =
+ execute_attr_map_slot(PartitionToRootMap->attrMap,
+ remoteslot_part, remoteslot);
+ }
+ else
+ {
+ remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
+ slot_getallattrs(remoteslot);
+ }
+
+
+ /* Find the new partition. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo_new = ExecFindPartition(mtstate, relinfo,
+ proute, remoteslot,
+ estate);
+ MemoryContextSwitchTo(oldctx);
+ Assert(partrelinfo_new != partrelinfo);
+
+ /* DELETE old tuple found in the old partition. */
+ estate->es_result_relation_info = partrelinfo;
+ apply_handle_delete_internal(partrelinfo, estate,
+ localslot,
+ &relmapentry->remoterel);
+
+ /* INSERT new tuple into the new partition. */
+
+ /*
+ * Convert the replacement tuple to match the destination
+ * partition rowtype.
+ */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrel = partrelinfo_new->ri_RelationDesc;
+ partinfo = partrelinfo_new->ri_PartitionInfo;
+ remoteslot_part = partinfo->pi_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel,
+ &estate->es_tupleTable);
+ map = partinfo->pi_RootToPartitionMap;
+ if (map != NULL)
+ {
+ remoteslot_part = execute_attr_map_slot(map->attrMap,
+ remoteslot,
+ remoteslot_part);
+ }
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part,
+ remoteslot);
+ slot_getallattrs(remoteslot);
+ }
+ MemoryContextSwitchTo(oldctx);
+ estate->es_result_relation_info = partrelinfo_new;
+ apply_handle_insert_internal(partrelinfo_new, estate,
+ remoteslot_part);
+ }
+ }
+ break;
+
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) operation);
+ break;
+ }
+
+ ExecCleanupTupleRouting(mtstate, proute);
+}
+
/*
* Handle TRUNCATE message.
*
@@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s)
List *remote_relids = NIL;
List *remote_rels = NIL;
List *rels = NIL;
+ List *part_rels = NIL;
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
@@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s)
relids = lappend_oid(relids, rel->localreloid);
if (RelationIsLogicallyLogged(rel->localrel))
relids_logged = lappend_oid(relids_logged, rel->localreloid);
+
+ /*
+ * Truncate partitions if we got a message to truncate a partitioned
+ * table.
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ ListCell *child;
+ List *children = find_all_inheritors(rel->localreloid,
+ RowExclusiveLock,
+ NULL);
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ if (list_member_oid(relids, childrelid))
+ continue;
+
+ /* find_all_inheritors already got lock */
+ childrel = table_open(childrelid, NoLock);
+
+ /*
+ * Ignore temp tables of other backends. See similar code in
+ * ExecuteTruncate().
+ */
+ if (RELATION_IS_OTHER_TEMP(childrel))
+ {
+ table_close(childrel, RowExclusiveLock);
+ continue;
+ }
+
+ rels = lappend(rels, childrel);
+ part_rels = lappend(part_rels, childrel);
+ relids = lappend_oid(relids, childrelid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(childrel))
+ relids_logged = lappend_oid(relids_logged, childrelid);
+ }
+ }
}
/*
@@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s)
logicalrep_rel_close(rel, NoLock);
}
+ foreach(lc, part_rels)
+ {
+ Relation rel = lfirst(lc);
+
+ table_close(rel, NoLock);
+ }
CommandCounterIncrement();
}
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 9971a8028ca..4650b4f9e1b 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -34,6 +34,8 @@ extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
LOCKMODE lockmode);
+extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+ Relation partrel, AttrMap *map);
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index ea5812ce189..5db1b21c594 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 15;
+use Test::More tests => 24;
# setup
@@ -33,29 +33,49 @@ $node_publisher->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
$node_publisher->safe_psql('postgres',
- "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
+ "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT");
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
# subscriber1
+#
+# This is partitioned differently from the publisher. tab1_2 is
+# subpartitioned. This tests the tuple routing code on the
+# subscriber.
$node_subscriber1->safe_psql('postgres',
- "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)");
+ "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
$node_subscriber1->safe_psql('postgres',
"CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
+
$node_subscriber1->safe_psql('postgres',
- "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
+ "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
$node_subscriber1->safe_psql('postgres',
- "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)");
+ "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+ "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT");
$node_subscriber1->safe_psql('postgres',
"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
# subscriber 2
+#
+# This does not use partitioning. The tables match the leaf tables on
+# the publisher.
$node_subscriber2->safe_psql('postgres',
"CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)");
$node_subscriber2->safe_psql('postgres',
"CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
$node_subscriber2->safe_psql('postgres',
"CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')");
$node_subscriber2->safe_psql('postgres',
"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all");
@@ -74,59 +94,122 @@ $node_publisher->safe_psql('postgres',
"INSERT INTO tab1_1 (a) VALUES (3)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (0)");
$node_publisher->wait_for_catchup('sub1');
$node_publisher->wait_for_catchup('sub2');
my $result = $node_subscriber1->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|0
+sub1_tab1|1
+sub1_tab1|3
+sub1_tab1|5), 'inserts into tab1 and its partitions replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|1
+sub2_tab1_1|3), 'inserts into tab1_1 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
-is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated');
-# update (no partition change)
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_def ORDER BY 1, 2");
+is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated');
+
+# update (replicated as update)
$node_publisher->safe_psql('postgres',
"UPDATE tab1 SET a = 2 WHERE a = 1");
-
-$node_publisher->wait_for_catchup('sub1');
-$node_publisher->wait_for_catchup('sub2');
-
-$result = $node_subscriber1->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
-
-$result = $node_subscriber2->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
-
-# update (partition changes)
+# All of the following cause an update to be applied to a partitioned
+# table on subscriber1: tab1_2 is leaf partition on publisher, whereas
+# it's sub-partitioned on subscriber1.
$node_publisher->safe_psql('postgres',
- "UPDATE tab1 SET a = 6 WHERE a = 2");
+ "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 4 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 6 WHERE a = 4");
$node_publisher->wait_for_catchup('sub1');
$node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber1->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|0
+sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|6), 'update of tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'update of tab1_1 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
-is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|6), 'tab1_2 updated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_def ORDER BY 1");
+is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 1 WHERE a = 0");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 4 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|4
+sub1_tab1|6), 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(4
+6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'tab1_1 unchanged');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|4
+sub2_tab1_2|6), 'insert into tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab1_def ORDER BY 1");
+is($result, qq(), 'delete from tab1_def replicated');
# delete
$node_publisher->safe_psql('postgres',
- "DELETE FROM tab1 WHERE a IN (3, 5)");
+ "DELETE FROM tab1 WHERE a IN (2, 3, 5)");
$node_publisher->safe_psql('postgres',
"DELETE FROM tab1_2");
@@ -134,22 +217,22 @@ $node_publisher->wait_for_catchup('sub1');
$node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber1->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
+ "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1_1, tab1_2 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1_1");
-is($result, qq(0||), 'delete from tab1_1 replicated');
+ "SELECT a FROM tab1_1");
+is($result, qq(), 'delete from tab1_1 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1_2");
-is($result, qq(0||), 'delete from tab1_2 replicated');
+ "SELECT a FROM tab1_2");
+is($result, qq(), 'delete from tab1_2 replicated');
# truncate
$node_subscriber1->safe_psql('postgres',
- "INSERT INTO tab1 VALUES (1), (2), (5)");
+ "INSERT INTO tab1 (a) VALUES (1), (2), (5)");
$node_subscriber2->safe_psql('postgres',
- "INSERT INTO tab1_2 VALUES (2)");
+ "INSERT INTO tab1_2 (a) VALUES (2)");
$node_publisher->safe_psql('postgres',
"TRUNCATE tab1_2");
@@ -157,12 +240,13 @@ $node_publisher->wait_for_catchup('sub1');
$node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber1->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
+ "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(1
+2), 'truncate of tab1_2 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1_2");
-is($result, qq(0||), 'truncate of tab1_2 replicated');
+ "SELECT a FROM tab1_2 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_2 replicated');
$node_publisher->safe_psql('postgres',
"TRUNCATE tab1");
@@ -171,8 +255,8 @@ $node_publisher->wait_for_catchup('sub1');
$node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber1->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'truncate of tab1_1 replicated');
+ "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_1 replicated');
$result = $node_subscriber2->safe_psql('postgres',
- "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'truncate of tab1_1 replicated');
+ "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1 replicated');