From f1ac27bfda6ce8a399d8001843e9aefff5814f9b Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 6 Apr 2020 15:15:52 +0200 Subject: [PATCH] Add logical replication support to replicate into partitioned tables Mainly, this adds support code in logical/worker.c for applying replicated operations whose target is a partitioned table to its relevant partitions. Author: Amit Langote Reviewed-by: Rafia Sabih Reviewed-by: Peter Eisentraut Reviewed-by: Petr Jelinek Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com --- doc/src/sgml/logical-replication.sgml | 19 +- src/backend/executor/execReplication.c | 14 +- src/backend/replication/logical/relation.c | 189 ++++++++++++ src/backend/replication/logical/tablesync.c | 1 - src/backend/replication/logical/worker.c | 316 +++++++++++++++++++- src/include/replication/logicalrelation.h | 2 + src/test/subscription/t/013_partition.pl | 182 ++++++++--- 7 files changed, 645 insertions(+), 78 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 8bd7c9c8ac0..c513621470a 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,18 +402,21 @@ - Replication is only supported by tables, partitioned or not, although a - given table must either be partitioned on both servers or not partitioned - at all. Also, when replicating between partitioned tables, the actual - replication occurs between leaf partitions, so partitions on the two - servers must match one-to-one. - - - + Replication is only supported by tables, including partitioned tables. Attempts to replicate other types of relations such as views, materialized views, or foreign tables, will result in an error. + + + + When replicating between partitioned tables, the actual replication + originates from the leaf partitions on the publisher, so partitions on + the publisher must also exist on the subscriber as valid target tables. + (They could either be leaf partitions themselves, or they could be + further subpartitioned, or they could even be independent tables.) + + diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 7194becfd99..dc8a01a5cd5 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -594,17 +594,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * Give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -612,7 +604,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 3d7291b9703..2e7b755aebb 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL; static HTAB *LogicalRepRelMap = NULL; static HTAB *LogicalRepTypMap = NULL; +/* + * Partition map (LogicalRepPartMap) + * + * When a partitioned table is used as replication target, replicated + * operations are actually performed on its leaf partitions, which requires + * the partitions to also be mapped to the remote relation. Parent's entry + * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because + * individual partitions may have different attribute numbers, which means + * attribute mappings to remote relation's attributes must be maintained + * separately for each partition. + */ +static MemoryContext LogicalRepPartMapContext = NULL; +static HTAB *LogicalRepPartMap = NULL; +typedef struct LogicalRepPartMapEntry +{ + Oid partoid; /* LogicalRepPartMap's key */ + LogicalRepRelMapEntry relmapentry; +} LogicalRepPartMapEntry; /* * Relcache invalidation callback for our relation map cache. @@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid) Assert(OidIsValid(entry->remoteid)); return psprintf("%s.%s", entry->nspname, entry->typname); } + +/* + * Partition cache: look up partition LogicalRepRelMapEntry's + * + * Unlike relation map cache, this is keyed by partition OID, not remote + * relation OID, because we only have to use this cache in the case where + * partitions are not directly mapped to any remote relation, such as when + * replication is occurring with one of their ancestors as target. + */ + +/* + * Relcache invalidation callback + */ +static void +logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) +{ + LogicalRepRelMapEntry *entry; + + /* Just to be sure. */ + if (LogicalRepPartMap == NULL) + return; + + if (reloid != InvalidOid) + { + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + /* TODO, use inverse lookup hashtable? */ + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == reloid) + { + entry->localreloid = InvalidOid; + hash_seq_term(&status); + break; + } + } + } + else + { + /* invalidate all cache entries */ + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + entry->localreloid = InvalidOid; + } +} + +/* + * Initialize the partition map cache. + */ +static void +logicalrep_partmap_init(void) +{ + HASHCTL ctl; + + if (!LogicalRepPartMapContext) + LogicalRepPartMapContext = + AllocSetContextCreate(CacheMemoryContext, + "LogicalRepPartMapContext", + ALLOCSET_DEFAULT_SIZES); + + /* Initialize the relation hash table. */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); /* partition OID */ + ctl.entrysize = sizeof(LogicalRepPartMapEntry); + ctl.hcxt = LogicalRepPartMapContext; + + LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, + (Datum) 0); +} + +/* + * logicalrep_partition_open + * + * Returned entry reuses most of the values of the root table's entry, save + * the attribute map, which can be different for the partition. + * + * Note there's no logialrep_partition_close, because the caller closes the + * the component relation. + */ +LogicalRepRelMapEntry * +logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map) +{ + LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelation *remoterel = &root->remoterel; + Oid partOid = RelationGetRelid(partrel); + AttrMap *attrmap = root->attrmap; + bool found; + int i; + MemoryContext oldctx; + + if (LogicalRepPartMap == NULL) + logicalrep_partmap_init(); + + /* Search for existing entry. */ + part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap, + (void *) &partOid, + HASH_ENTER, &found); + + if (found) + return &part_entry->relmapentry; + + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + + /* Switch to longer-lived context. */ + oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); + + part_entry->partoid = partOid; + + /* Remote relation is used as-is from the root entry. */ + entry = &part_entry->relmapentry; + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + + entry->localrel = partrel; + entry->localreloid = partOid; + + /* + * If the partition's attributes don't match the root relation's, we'll + * need to make a new attrmap which maps partition attribute numbers to + * remoterel's, instead the original which maps root relation's attribute + * numbers to remoterel's. + * + * Note that 'map' which comes from the tuple routing data structure + * contains 1-based attribute numbers (of the parent relation). However, + * the map in 'entry', a logical replication data structure, contains + * 0-based attribute numbers (of the remote relation). + */ + if (map) + { + AttrNumber attno; + + entry->attrmap = make_attrmap(map->maplen); + for (attno = 0; attno < entry->attrmap->maplen; attno++) + { + AttrNumber root_attno = map->attnums[attno]; + + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + } + } + else + entry->attrmap = attrmap; + + entry->updatable = root->updatable; + + /* state and statelsn are left set to 0. */ + MemoryContextSwitchTo(oldctx); + + return entry; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a60c6661538..c27d9705895 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -762,7 +762,6 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); - Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 673ebd211d1..a752a1224d6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,14 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); +static void apply_handle_tuple_routing(ResultRelInfo *relinfo, + EState *estate, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry, + CmdType operation); /* * Should this worker apply changes for given relation. @@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_insert_internal(estate->es_result_relation_info, estate, - remoteslot); + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_INSERT); + else + apply_handle_insert_internal(estate->es_result_relation_info, estate, + remoteslot); PopActiveSnapshot(); @@ -767,9 +780,13 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_update_internal(estate->es_result_relation_info, estate, - remoteslot, &newtup, rel); + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel, CMD_UPDATE); + else + apply_handle_update_internal(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel); PopActiveSnapshot(); @@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s) slot_store_cstrings(remoteslot, rel, oldtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_delete_internal(estate->es_result_relation_info, estate, - remoteslot, &rel->remoterel); + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_DELETE); + else + apply_handle_delete_internal(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); PopActiveSnapshot(); @@ -975,6 +996,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, return found; } +/* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ResultRelInfo *relinfo, + EState *estate, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry, + CmdType operation) +{ + Relation parentrel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; + ResultRelInfo *partrelinfo; + Relation partrel; + TupleTableSlot *remoteslot_part; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + MemoryContext oldctx; + + /* ModifyTableState is needed for ExecFindPartition(). */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel); + + /* + * Find the partition to which the "search tuple" belongs. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + partrel = partrelinfo->ri_RelationDesc; + + /* + * To perform any of the operations below, the tuple must match the + * partition's rowtype. Convert if needed or just copy, using a dedicated + * slot to store the tuple in any case. + */ + partinfo = partrelinfo->ri_PartitionInfo; + remoteslot_part = partinfo->pi_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, + remoteslot_part); + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); + slot_getallattrs(remoteslot_part); + } + MemoryContextSwitchTo(oldctx); + + estate->es_result_relation_info = partrelinfo; + switch (operation) + { + case CMD_INSERT: + apply_handle_insert_internal(partrelinfo, estate, + remoteslot_part); + break; + + case CMD_DELETE: + apply_handle_delete_internal(partrelinfo, estate, + remoteslot_part, + &relmapentry->remoterel); + break; + + case CMD_UPDATE: + + /* + * For UPDATE, depending on whether or not the updated tuple + * satisfies the partition's constraint, perform a simple UPDATE + * of the partition or move the updated tuple into a different + * suitable partition. + */ + { + AttrMap *attrmap = map ? map->attrMap : NULL; + LogicalRepRelMapEntry *part_entry; + TupleTableSlot *localslot; + ResultRelInfo *partrelinfo_new; + bool found; + + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + + /* Get the matching local tuple from the partition. */ + found = FindReplTupleInLocalRel(estate, partrel, + &part_entry->remoterel, + remoteslot_part, &localslot); + + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + if (found) + { + /* Apply the update. */ + slot_modify_cstrings(remoteslot_part, localslot, + part_entry, + newtup->values, newtup->changed); + MemoryContextSwitchTo(oldctx); + } + else + { + /* + * The tuple to be updated could not be found. + * + * TODO what to do here, change the log level to LOG + * perhaps? + */ + elog(DEBUG1, + "logical replication did not find row for update " + "in replication target relation \"%s\"", + RelationGetRelationName(partrel)); + } + + /* + * Does the updated tuple still satisfy the current + * partition's constraint? + */ + if (partrelinfo->ri_PartitionCheck == NULL || + ExecPartitionCheck(partrelinfo, remoteslot_part, estate, + false)) + { + /* + * Yes, so simply UPDATE the partition. We don't call + * apply_handle_update_internal() here, which would + * normally do the following work, to avoid repeating some + * work already done above to find the local tuple in the + * partition. + */ + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + ExecOpenIndices(partrelinfo, false); + + EvalPlanQualSetSlot(&epqstate, remoteslot_part); + ExecSimpleRelationUpdate(estate, &epqstate, localslot, + remoteslot_part); + ExecCloseIndices(partrelinfo); + EvalPlanQualEnd(&epqstate); + } + else + { + /* Move the tuple into the new partition. */ + + /* + * New partition will be found using tuple routing, which + * can only occur via the parent table. We might need to + * convert the tuple to the parent's rowtype. Note that + * this is the tuple found in the partition, not the + * original search tuple received by this function. + */ + if (map) + { + TupleConversionMap *PartitionToRootMap = + convert_tuples_by_name(RelationGetDescr(partrel), + RelationGetDescr(parentrel)); + + remoteslot = + execute_attr_map_slot(PartitionToRootMap->attrMap, + remoteslot_part, remoteslot); + } + else + { + remoteslot = ExecCopySlot(remoteslot, remoteslot_part); + slot_getallattrs(remoteslot); + } + + + /* Find the new partition. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo_new = ExecFindPartition(mtstate, relinfo, + proute, remoteslot, + estate); + MemoryContextSwitchTo(oldctx); + Assert(partrelinfo_new != partrelinfo); + + /* DELETE old tuple found in the old partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_delete_internal(partrelinfo, estate, + localslot, + &relmapentry->remoterel); + + /* INSERT new tuple into the new partition. */ + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrel = partrelinfo_new->ri_RelationDesc; + partinfo = partrelinfo_new->ri_PartitionInfo; + remoteslot_part = partinfo->pi_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, + &estate->es_tupleTable); + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + remoteslot_part = execute_attr_map_slot(map->attrMap, + remoteslot, + remoteslot_part); + } + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, + remoteslot); + slot_getallattrs(remoteslot); + } + MemoryContextSwitchTo(oldctx); + estate->es_result_relation_info = partrelinfo_new; + apply_handle_insert_internal(partrelinfo_new, estate, + remoteslot_part); + } + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } + + ExecCleanupTupleRouting(mtstate, proute); +} + /* * Handle TRUNCATE message. * @@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s) List *remote_relids = NIL; List *remote_rels = NIL; List *rels = NIL; + List *part_rels = NIL; List *relids = NIL; List *relids_logged = NIL; ListCell *lc; @@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s) relids = lappend_oid(relids, rel->localreloid); if (RelationIsLogicallyLogged(rel->localrel)) relids_logged = lappend_oid(relids_logged, rel->localreloid); + + /* + * Truncate partitions if we got a message to truncate a partitioned + * table. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + RowExclusiveLock, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * Ignore temp tables of other backends. See similar code in + * ExecuteTruncate(). + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, RowExclusiveLock); + continue; + } + + rels = lappend(rels, childrel); + part_rels = lappend(part_rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } } /* @@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s) logicalrep_rel_close(rel, NoLock); } + foreach(lc, part_rels) + { + Relation rel = lfirst(lc); + + table_close(rel, NoLock); + } CommandCounterIncrement(); } diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 9971a8028ca..4650b4f9e1b 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -34,6 +34,8 @@ extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); +extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map); extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index ea5812ce189..5db1b21c594 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 15; +use Test::More tests => 24; # setup @@ -33,29 +33,49 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT"); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); # subscriber1 +# +# This is partitioned differently from the publisher. tab1_2 is +# subpartitioned. This tests the tuple routing code on the +# subscriber. $node_subscriber1->safe_psql('postgres', - "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)"); + "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); + $node_subscriber1->safe_psql('postgres', - "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); $node_subscriber1->safe_psql('postgres', - "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT"); $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); # subscriber 2 +# +# This does not use partitioning. The tables match the leaf tables on +# the publisher. $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')"); $node_subscriber2->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"); @@ -74,59 +94,122 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (0)"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); my $result = $node_subscriber1->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); -is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub1_tab1|0 +sub1_tab1|1 +sub1_tab1|3 +sub1_tab1|5), 'inserts into tab1 and its partitions replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); -is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated'); + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_1|1 +sub2_tab1_1|3), 'inserts into tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); -is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated'); -# update (no partition change) +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1, 2"); +is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated'); + +# update (replicated as update) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); - -$node_publisher->wait_for_catchup('sub1'); -$node_publisher->wait_for_catchup('sub2'); - -$result = $node_subscriber1->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); -is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); - -$result = $node_subscriber2->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); -is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated'); - -# update (partition changes) +# All of the following cause an update to be applied to a partitioned +# table on subscriber1: tab1_2 is leaf partition on publisher, whereas +# it's sub-partitioned on subscriber1. $node_publisher->safe_psql('postgres', - "UPDATE tab1 SET a = 6 WHERE a = 2"); + "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 4 WHERE a = 6"); +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 4"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); -is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated'); + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub1_tab1|0 +sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|6), 'update of tab1_1, tab1_2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); -is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated'); + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'update of tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); -is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|6), 'tab1_2 updated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1"); +is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 1 WHERE a = 0"); +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 4 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|4 +sub1_tab1|6), 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(4 +6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'tab1_1 unchanged'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|4 +sub2_tab1_2|6), 'insert into tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1_def ORDER BY 1"); +is($result, qq(), 'delete from tab1_def replicated'); # delete $node_publisher->safe_psql('postgres', - "DELETE FROM tab1 WHERE a IN (3, 5)"); + "DELETE FROM tab1 WHERE a IN (2, 3, 5)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab1_2"); @@ -134,22 +217,22 @@ $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1"); -is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated'); + "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1_1, tab1_2 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1_1"); -is($result, qq(0||), 'delete from tab1_1 replicated'); + "SELECT a FROM tab1_1"); +is($result, qq(), 'delete from tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1_2"); -is($result, qq(0||), 'delete from tab1_2 replicated'); + "SELECT a FROM tab1_2"); +is($result, qq(), 'delete from tab1_2 replicated'); # truncate $node_subscriber1->safe_psql('postgres', - "INSERT INTO tab1 VALUES (1), (2), (5)"); + "INSERT INTO tab1 (a) VALUES (1), (2), (5)"); $node_subscriber2->safe_psql('postgres', - "INSERT INTO tab1_2 VALUES (2)"); + "INSERT INTO tab1_2 (a) VALUES (2)"); $node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); @@ -157,12 +240,13 @@ $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1"); -is($result, qq(2|1|2), 'truncate of tab1_2 replicated'); + "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(1 +2), 'truncate of tab1_2 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1_2"); -is($result, qq(0||), 'truncate of tab1_2 replicated'); + "SELECT a FROM tab1_2 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_2 replicated'); $node_publisher->safe_psql('postgres', "TRUNCATE tab1"); @@ -171,8 +255,8 @@ $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1"); -is($result, qq(0||), 'truncate of tab1_1 replicated'); + "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', - "SELECT count(*), min(a), max(a) FROM tab1"); -is($result, qq(0||), 'truncate of tab1_1 replicated'); + "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1 replicated');