1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Add logical replication support to replicate into partitioned tables

Mainly, this adds support code in logical/worker.c for applying
replicated operations whose target is a partitioned table to its
relevant partitions.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
This commit is contained in:
Peter Eisentraut
2020-04-06 15:15:52 +02:00
parent b7ce6de93b
commit f1ac27bfda
7 changed files with 645 additions and 78 deletions

View File

@ -594,17 +594,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
/*
* We currently only support writing to regular tables. However, give a
* more specific error for partitioned and foreign tables.
* Give a more specific error for foreign tables.
*/
if (relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
nspname, relname),
errdetail("\"%s.%s\" is a partitioned table.",
nspname, relname)));
else if (relkind == RELKIND_FOREIGN_TABLE)
if (relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
@ -612,7 +604,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
errdetail("\"%s.%s\" is a foreign table.",
nspname, relname)));
if (relkind != RELKIND_RELATION)
if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",

View File

@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
/*
* Partition map (LogicalRepPartMap)
*
* When a partitioned table is used as replication target, replicated
* operations are actually performed on its leaf partitions, which requires
* the partitions to also be mapped to the remote relation. Parent's entry
* (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
* individual partitions may have different attribute numbers, which means
* attribute mappings to remote relation's attributes must be maintained
* separately for each partition.
*/
static MemoryContext LogicalRepPartMapContext = NULL;
static HTAB *LogicalRepPartMap = NULL;
typedef struct LogicalRepPartMapEntry
{
Oid partoid; /* LogicalRepPartMap's key */
LogicalRepRelMapEntry relmapentry;
} LogicalRepPartMapEntry;
/*
* Relcache invalidation callback for our relation map cache.
@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid)
Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
}
/*
* Partition cache: look up partition LogicalRepRelMapEntry's
*
* Unlike relation map cache, this is keyed by partition OID, not remote
* relation OID, because we only have to use this cache in the case where
* partitions are not directly mapped to any remote relation, such as when
* replication is occurring with one of their ancestors as target.
*/
/*
* Relcache invalidation callback
*/
static void
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepPartMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepPartMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->localreloid == reloid)
{
entry->localreloid = InvalidOid;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepPartMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localreloid = InvalidOid;
}
}
/*
* Initialize the partition map cache.
*/
static void
logicalrep_partmap_init(void)
{
HASHCTL ctl;
if (!LogicalRepPartMapContext)
LogicalRepPartMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepPartMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid); /* partition OID */
ctl.entrysize = sizeof(LogicalRepPartMapEntry);
ctl.hcxt = LogicalRepPartMapContext;
LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
(Datum) 0);
}
/*
* logicalrep_partition_open
*
* Returned entry reuses most of the values of the root table's entry, save
* the attribute map, which can be different for the partition.
*
* Note there's no logialrep_partition_close, because the caller closes the
* the component relation.
*/
LogicalRepRelMapEntry *
logicalrep_partition_open(LogicalRepRelMapEntry *root,
Relation partrel, AttrMap *map)
{
LogicalRepRelMapEntry *entry;
LogicalRepPartMapEntry *part_entry;
LogicalRepRelation *remoterel = &root->remoterel;
Oid partOid = RelationGetRelid(partrel);
AttrMap *attrmap = root->attrmap;
bool found;
int i;
MemoryContext oldctx;
if (LogicalRepPartMap == NULL)
logicalrep_partmap_init();
/* Search for existing entry. */
part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
(void *) &partOid,
HASH_ENTER, &found);
if (found)
return &part_entry->relmapentry;
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
part_entry->partoid = partOid;
/* Remote relation is used as-is from the root entry. */
entry = &part_entry->relmapentry;
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
entry->localrel = partrel;
entry->localreloid = partOid;
/*
* If the partition's attributes don't match the root relation's, we'll
* need to make a new attrmap which maps partition attribute numbers to
* remoterel's, instead the original which maps root relation's attribute
* numbers to remoterel's.
*
* Note that 'map' which comes from the tuple routing data structure
* contains 1-based attribute numbers (of the parent relation). However,
* the map in 'entry', a logical replication data structure, contains
* 0-based attribute numbers (of the remote relation).
*/
if (map)
{
AttrNumber attno;
entry->attrmap = make_attrmap(map->maplen);
for (attno = 0; attno < entry->attrmap->maplen; attno++)
{
AttrNumber root_attno = map->attnums[attno];
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
}
}
else
entry->attrmap = attrmap;
entry->updatable = root->updatable;
/* state and statelsn are left set to 0. */
MemoryContextSwitchTo(oldctx);
return entry;
}

View File

@ -762,7 +762,6 @@ copy_table(Relation rel)
/* Map the publisher relation to local one. */
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
/* Start copy on the publisher. */
initStringInfo(&cmd);

View File

@ -29,11 +29,14 @@
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
EState *estate,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry,
CmdType operation);
/*
* Should this worker apply changes for given relation.
@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_insert_internal(estate->es_result_relation_info, estate,
remoteslot);
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(estate->es_result_relation_info, estate,
remoteslot, NULL, rel, CMD_INSERT);
else
apply_handle_insert_internal(estate->es_result_relation_info, estate,
remoteslot);
PopActiveSnapshot();
@ -767,9 +780,13 @@ apply_handle_update(StringInfo s)
has_oldtup ? oldtup.values : newtup.values);
MemoryContextSwitchTo(oldctx);
Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_update_internal(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel);
/* For a partitioned table, apply update to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel, CMD_UPDATE);
else
apply_handle_update_internal(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel);
PopActiveSnapshot();
@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s)
slot_store_cstrings(remoteslot, rel, oldtup.values);
MemoryContextSwitchTo(oldctx);
Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
apply_handle_delete_internal(estate->es_result_relation_info, estate,
remoteslot, &rel->remoterel);
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(estate->es_result_relation_info, estate,
remoteslot, NULL, rel, CMD_DELETE);
else
apply_handle_delete_internal(estate->es_result_relation_info, estate,
remoteslot, &rel->remoterel);
PopActiveSnapshot();
@ -975,6 +996,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
return found;
}
/*
* This handles insert, update, delete on a partitioned table.
*/
static void
apply_handle_tuple_routing(ResultRelInfo *relinfo,
EState *estate,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
LogicalRepRelMapEntry *relmapentry,
CmdType operation)
{
Relation parentrel = relinfo->ri_RelationDesc;
ModifyTableState *mtstate = NULL;
PartitionTupleRouting *proute = NULL;
ResultRelInfo *partrelinfo;
Relation partrel;
TupleTableSlot *remoteslot_part;
PartitionRoutingInfo *partinfo;
TupleConversionMap *map;
MemoryContext oldctx;
/* ModifyTableState is needed for ExecFindPartition(). */
mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = operation;
mtstate->resultRelInfo = relinfo;
proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
/*
* Find the partition to which the "search tuple" belongs.
*/
Assert(remoteslot != NULL);
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
remoteslot, estate);
Assert(partrelinfo != NULL);
partrel = partrelinfo->ri_RelationDesc;
/*
* To perform any of the operations below, the tuple must match the
* partition's rowtype. Convert if needed or just copy, using a dedicated
* slot to store the tuple in any case.
*/
partinfo = partrelinfo->ri_PartitionInfo;
remoteslot_part = partinfo->pi_PartitionTupleSlot;
if (remoteslot_part == NULL)
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
map = partinfo->pi_RootToPartitionMap;
if (map != NULL)
remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
remoteslot_part);
else
{
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
slot_getallattrs(remoteslot_part);
}
MemoryContextSwitchTo(oldctx);
estate->es_result_relation_info = partrelinfo;
switch (operation)
{
case CMD_INSERT:
apply_handle_insert_internal(partrelinfo, estate,
remoteslot_part);
break;
case CMD_DELETE:
apply_handle_delete_internal(partrelinfo, estate,
remoteslot_part,
&relmapentry->remoterel);
break;
case CMD_UPDATE:
/*
* For UPDATE, depending on whether or not the updated tuple
* satisfies the partition's constraint, perform a simple UPDATE
* of the partition or move the updated tuple into a different
* suitable partition.
*/
{
AttrMap *attrmap = map ? map->attrMap : NULL;
LogicalRepRelMapEntry *part_entry;
TupleTableSlot *localslot;
ResultRelInfo *partrelinfo_new;
bool found;
part_entry = logicalrep_partition_open(relmapentry, partrel,
attrmap);
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
remoteslot_part, &localslot);
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
if (found)
{
/* Apply the update. */
slot_modify_cstrings(remoteslot_part, localslot,
part_entry,
newtup->values, newtup->changed);
MemoryContextSwitchTo(oldctx);
}
else
{
/*
* The tuple to be updated could not be found.
*
* TODO what to do here, change the log level to LOG
* perhaps?
*/
elog(DEBUG1,
"logical replication did not find row for update "
"in replication target relation \"%s\"",
RelationGetRelationName(partrel));
}
/*
* Does the updated tuple still satisfy the current
* partition's constraint?
*/
if (partrelinfo->ri_PartitionCheck == NULL ||
ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
false))
{
/*
* Yes, so simply UPDATE the partition. We don't call
* apply_handle_update_internal() here, which would
* normally do the following work, to avoid repeating some
* work already done above to find the local tuple in the
* partition.
*/
EPQState epqstate;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(partrelinfo, false);
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
ExecSimpleRelationUpdate(estate, &epqstate, localslot,
remoteslot_part);
ExecCloseIndices(partrelinfo);
EvalPlanQualEnd(&epqstate);
}
else
{
/* Move the tuple into the new partition. */
/*
* New partition will be found using tuple routing, which
* can only occur via the parent table. We might need to
* convert the tuple to the parent's rowtype. Note that
* this is the tuple found in the partition, not the
* original search tuple received by this function.
*/
if (map)
{
TupleConversionMap *PartitionToRootMap =
convert_tuples_by_name(RelationGetDescr(partrel),
RelationGetDescr(parentrel));
remoteslot =
execute_attr_map_slot(PartitionToRootMap->attrMap,
remoteslot_part, remoteslot);
}
else
{
remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
slot_getallattrs(remoteslot);
}
/* Find the new partition. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
partrelinfo_new = ExecFindPartition(mtstate, relinfo,
proute, remoteslot,
estate);
MemoryContextSwitchTo(oldctx);
Assert(partrelinfo_new != partrelinfo);
/* DELETE old tuple found in the old partition. */
estate->es_result_relation_info = partrelinfo;
apply_handle_delete_internal(partrelinfo, estate,
localslot,
&relmapentry->remoterel);
/* INSERT new tuple into the new partition. */
/*
* Convert the replacement tuple to match the destination
* partition rowtype.
*/
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
partrel = partrelinfo_new->ri_RelationDesc;
partinfo = partrelinfo_new->ri_PartitionInfo;
remoteslot_part = partinfo->pi_PartitionTupleSlot;
if (remoteslot_part == NULL)
remoteslot_part = table_slot_create(partrel,
&estate->es_tupleTable);
map = partinfo->pi_RootToPartitionMap;
if (map != NULL)
{
remoteslot_part = execute_attr_map_slot(map->attrMap,
remoteslot,
remoteslot_part);
}
else
{
remoteslot_part = ExecCopySlot(remoteslot_part,
remoteslot);
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
estate->es_result_relation_info = partrelinfo_new;
apply_handle_insert_internal(partrelinfo_new, estate,
remoteslot_part);
}
}
break;
default:
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
ExecCleanupTupleRouting(mtstate, proute);
}
/*
* Handle TRUNCATE message.
*
@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s)
List *remote_relids = NIL;
List *remote_rels = NIL;
List *rels = NIL;
List *part_rels = NIL;
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s)
relids = lappend_oid(relids, rel->localreloid);
if (RelationIsLogicallyLogged(rel->localrel))
relids_logged = lappend_oid(relids_logged, rel->localreloid);
/*
* Truncate partitions if we got a message to truncate a partitioned
* table.
*/
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
ListCell *child;
List *children = find_all_inheritors(rel->localreloid,
RowExclusiveLock,
NULL);
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;
if (list_member_oid(relids, childrelid))
continue;
/* find_all_inheritors already got lock */
childrel = table_open(childrelid, NoLock);
/*
* Ignore temp tables of other backends. See similar code in
* ExecuteTruncate().
*/
if (RELATION_IS_OTHER_TEMP(childrel))
{
table_close(childrel, RowExclusiveLock);
continue;
}
rels = lappend(rels, childrel);
part_rels = lappend(part_rels, childrel);
relids = lappend_oid(relids, childrelid);
/* Log this relation only if needed for logical decoding */
if (RelationIsLogicallyLogged(childrel))
relids_logged = lappend_oid(relids_logged, childrelid);
}
}
}
/*
@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s)
logicalrep_rel_close(rel, NoLock);
}
foreach(lc, part_rels)
{
Relation rel = lfirst(lc);
table_close(rel, NoLock);
}
CommandCounterIncrement();
}