1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Allow publishing partition changes via ancestors

To control whether partition changes are replicated using their own
identity and schema or an ancestor's, add a new parameter that can be
set per publication named 'publish_via_partition_root'.

This allows replicating a partitioned table into a different partition
structure on the subscriber.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
This commit is contained in:
Peter Eisentraut
2020-04-08 09:59:27 +02:00
parent 1aac32df89
commit 83fd4532a7
15 changed files with 724 additions and 174 deletions

View File

@ -42,8 +42,6 @@
#include "utils/rel.h"
#include "utils/syscache.h"
static List *get_rel_publications(Oid relid);
/*
* Check if relation can be in given publication and throws appropriate
* error if not.
@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
return myself;
}
/*
* Gets list of publication oids for a relation, plus those of ancestors,
* if any, if the relation is a partition.
*/
/* Gets list of publication oids for a relation */
List *
GetRelationPublications(Oid relid)
{
List *result = NIL;
result = get_rel_publications(relid);
if (get_rel_relispartition(relid))
{
List *ancestors = get_partition_ancestors(relid);
ListCell *lc;
foreach(lc, ancestors)
{
Oid ancestor = lfirst_oid(lc);
List *ancestor_pubs = get_rel_publications(ancestor);
result = list_concat(result, ancestor_pubs);
}
}
return result;
}
/* Workhorse of GetRelationPublications() */
static List *
get_rel_publications(Oid relid)
{
List *result = NIL;
CatCList *pubrellist;
@ -373,9 +343,13 @@ GetAllTablesPublications(void)
/*
* Gets list of all relation published by FOR ALL TABLES publication(s).
*
* If the publication publishes partition changes via their respective root
* partitioned tables, we must exclude partitions in favor of including the
* root partitioned tables.
*/
List *
GetAllTablesPublicationRelations(void)
GetAllTablesPublicationRelations(bool pubviaroot)
{
Relation classRel;
ScanKeyData key[1];
@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
Oid relid = relForm->oid;
if (is_publishable_class(relid, relForm))
if (is_publishable_class(relid, relForm) &&
!(relForm->relispartition && pubviaroot))
result = lappend_oid(result, relid);
}
table_endscan(scan);
table_close(classRel, AccessShareLock);
if (pubviaroot)
{
ScanKeyInit(&key[0],
Anum_pg_class_relkind,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(RELKIND_PARTITIONED_TABLE));
scan = table_beginscan_catalog(classRel, 1, key);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
Oid relid = relForm->oid;
if (is_publishable_class(relid, relForm) &&
!relForm->relispartition)
result = lappend_oid(result, relid);
}
table_endscan(scan);
table_close(classRel, AccessShareLock);
}
return result;
}
@ -433,6 +430,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* need those.
*/
if (publication->alltables)
tables = GetAllTablesPublicationRelations();
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
else
tables = GetPublicationRelations(publication->oid,
publication->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
funcctx->user_fctx = (void *) tables;

View File

@ -23,6 +23,7 @@
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
static void
parse_publication_options(List *options,
bool *publish_given,
bool *publish_insert,
bool *publish_update,
bool *publish_delete,
bool *publish_truncate)
PublicationActions *pubactions,
bool *publish_via_partition_root_given,
bool *publish_via_partition_root)
{
ListCell *lc;
*publish_given = false;
*publish_via_partition_root_given = false;
/* Defaults are true */
*publish_insert = true;
*publish_update = true;
*publish_delete = true;
*publish_truncate = true;
/* defaults */
pubactions->pubinsert = true;
pubactions->pubupdate = true;
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
*publish_via_partition_root = false;
/* Parse options */
foreach(lc, options)
@ -91,10 +93,10 @@ parse_publication_options(List *options,
* If publish option was given only the explicitly listed actions
* should be published.
*/
*publish_insert = false;
*publish_update = false;
*publish_delete = false;
*publish_truncate = false;
pubactions->pubinsert = false;
pubactions->pubupdate = false;
pubactions->pubdelete = false;
pubactions->pubtruncate = false;
*publish_given = true;
publish = defGetString(defel);
@ -110,19 +112,28 @@ parse_publication_options(List *options,
char *publish_opt = (char *) lfirst(lc);
if (strcmp(publish_opt, "insert") == 0)
*publish_insert = true;
pubactions->pubinsert = true;
else if (strcmp(publish_opt, "update") == 0)
*publish_update = true;
pubactions->pubupdate = true;
else if (strcmp(publish_opt, "delete") == 0)
*publish_delete = true;
pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
*publish_truncate = true;
pubactions->pubtruncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
}
}
else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
{
if (*publish_via_partition_root_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*publish_via_partition_root_given = true;
*publish_via_partition_root = defGetBoolean(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
Datum values[Natts_pg_publication];
HeapTuple tup;
bool publish_given;
bool publish_insert;
bool publish_update;
bool publish_delete;
bool publish_truncate;
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
AclResult aclresult;
/* must have CREATE privilege on database */
@ -183,9 +193,9 @@ CreatePublication(CreatePublicationStmt *stmt)
values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
&publish_update, &publish_delete,
&publish_truncate);
&publish_given, &pubactions,
&publish_via_partition_root_given,
&publish_via_partition_root);
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
Anum_pg_publication_oid);
@ -193,13 +203,15 @@ CreatePublication(CreatePublicationStmt *stmt)
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
values[Anum_pg_publication_pubinsert - 1] =
BoolGetDatum(publish_insert);
BoolGetDatum(pubactions.pubinsert);
values[Anum_pg_publication_pubupdate - 1] =
BoolGetDatum(publish_update);
BoolGetDatum(pubactions.pubupdate);
values[Anum_pg_publication_pubdelete - 1] =
BoolGetDatum(publish_delete);
BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
BoolGetDatum(publish_truncate);
BoolGetDatum(pubactions.pubtruncate);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
bool replaces[Natts_pg_publication];
Datum values[Natts_pg_publication];
bool publish_given;
bool publish_insert;
bool publish_update;
bool publish_delete;
bool publish_truncate;
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
ObjectAddress obj;
Form_pg_publication pubform;
parse_publication_options(stmt->options,
&publish_given, &publish_insert,
&publish_update, &publish_delete,
&publish_truncate);
&publish_given, &pubactions,
&publish_via_partition_root_given,
&publish_via_partition_root);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
if (publish_given)
{
values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
replaces[Anum_pg_publication_pubinsert - 1] = true;
values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
replaces[Anum_pg_publication_pubupdate - 1] = true;
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
replaces[Anum_pg_publication_pubdelete - 1] = true;
values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
if (publish_via_partition_root_given)
{
values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
replaces[Anum_pg_publication_pubviaroot - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);

View File

@ -12,6 +12,8 @@
*/
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "fmgr.h"
#include "replication/logical.h"
@ -20,6 +22,7 @@
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@ -49,6 +52,7 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
@ -59,9 +63,31 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
bool schema_sent; /* did we send the schema? */
/*
* Did we send the schema? If ancestor relid is set, its schema must also
* have been sent for this to be true.
*/
bool schema_sent;
bool replicate_valid;
PublicationActions pubactions;
/*
* OID of the relation to publish changes as. For a partition, this may
* be set to one of its ancestors whose schema will be used when
* replicating changes, if publish_via_partition_root is set for the
* publication.
*/
Oid publish_as_relid;
/*
* Map used when replicating using an ancestor's schema to convert tuples
* from partition's type to the ancestor's; NULL if publish_as_relid is
* same as 'relid' or if unnecessary due to partition and the ancestor
* having identical TupleDesc.
*/
TupleConversionMap *map;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@ -259,47 +285,71 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
* Write the relation schema if the current schema hasn't been sent yet.
* Write the current schema of the relation and its ancestor (if any) if not
* done yet.
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
Relation relation, RelationSyncEntry *relentry)
{
if (!relentry->schema_sent)
if (relentry->schema_sent)
return;
/* If needed, send the ancestor's schema first. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
TupleDesc desc;
int i;
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
TupleDesc indesc = RelationGetDescr(relation);
TupleDesc outdesc = RelationGetDescr(ancestor);
MemoryContext oldctx;
desc = RelationGetDescr(relation);
/* Map must live as long as the session does. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
relentry->map = convert_tuples_by_name(indesc, outdesc);
MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, ctx);
RelationClose(ancestor);
}
/*
* Write out type info if needed. We do that only for user-created
* types. We use FirstGenbkiObjectId as the cutoff, so that we only
* consider objects with hand-assigned OIDs to be "built in", not for
* instance any function or type defined in the information_schema.
* This is important because only hand-assigned OIDs can be expected
* to remain stable across major versions.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
send_relation_and_attrs(relation, ctx);
relentry->schema_sent = true;
}
if (att->attisdropped || att->attgenerated)
continue;
/*
* Sends a relation
*/
static void
send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
if (att->atttypid < FirstGenbkiObjectId)
continue;
/*
* Write out type info if needed. We do that only for user-created types.
* We use FirstGenbkiObjectId as the cutoff, so that we only consider
* objects with hand-assigned OIDs to be "built in", not for instance any
* function or type defined in the information_schema. This is important
* because only hand-assigned OIDs can be expected to remain stable across
* major versions.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
}
if (att->attisdropped || att->attgenerated)
continue;
if (att->atttypid < FirstGenbkiObjectId)
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
relentry->schema_sent = true;
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
OutputPluginWrite(ctx, false);
}
/*
@ -346,28 +396,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation,
&change->data.tp.newtuple->tuple);
OutputPluginWrite(ctx, true);
break;
{
HeapTuple tuple = &change->data.tp.newtuple->tuple;
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid);
/* Convert tuple if needed. */
if (relentry->map)
tuple = execute_attr_map_tuple(tuple, relentry->map);
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, tuple);
OutputPluginWrite(ctx, true);
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple oldtuple = change->data.tp.oldtuple ?
&change->data.tp.oldtuple->tuple : NULL;
HeapTuple newtuple = &change->data.tp.newtuple->tuple;
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid);
/* Convert tuples if needed. */
if (relentry->map)
{
oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
newtuple = execute_attr_map_tuple(newtuple, relentry->map);
}
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple,
&change->data.tp.newtuple->tuple);
logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
OutputPluginWrite(ctx, true);
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
relation = RelationIdGetRelation(relentry->publish_as_relid);
/* Convert tuple if needed. */
if (relentry->map)
oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation,
&change->data.tp.oldtuple->tuple);
logicalrep_write_delete(ctx->out, relation, oldtuple);
OutputPluginWrite(ctx, true);
}
else
@ -412,10 +499,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
/*
* Don't send partitioned tables, because partitions should be sent
* instead.
* Don't send partitions if the publication wants to send only the
* root tables through it.
*/
if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
if (relation->rd_rel->relispartition &&
relentry->publish_as_relid != relid)
continue;
relids[nrelids++] = relid;
@ -540,12 +628,15 @@ init_rel_sync_cache(MemoryContext cachectx)
* This looks up publications that the given relation is directly or
* indirectly part of (the latter if it's really the relation's ancestor that
* is part of a publication) and fills up the found entry with the information
* about which operations to publish.
* about which operations to publish and whether to use an ancestor's schema
* when publishing.
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Oid relid)
{
RelationSyncEntry *entry;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
bool found;
MemoryContext oldctx;
@ -564,6 +655,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
{
List *pubids = GetRelationPublications(relid);
ListCell *lc;
Oid publish_as_relid = relid;
/* Reload publications if needed before use. */
if (!publications_valid)
@ -588,8 +680,56 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
foreach(lc, data->publications)
{
Publication *pub = lfirst(lc);
bool publish = false;
if (pub->alltables || list_member_oid(pubids, pub->oid))
if (pub->alltables)
{
publish = true;
if (pub->pubviaroot && am_partition)
publish_as_relid = llast_oid(get_partition_ancestors(relid));
}
if (!publish)
{
bool ancestor_published = false;
/*
* For a partition, check if any of the ancestors are
* published. If so, note down the topmost ancestor that is
* published via this publication, which will be used as the
* relation via which to publish the partition's changes.
*/
if (am_partition)
{
List *ancestors = get_partition_ancestors(relid);
ListCell *lc2;
/* Find the "topmost" ancestor that is in this publication. */
foreach(lc2, ancestors)
{
Oid ancestor = lfirst_oid(lc2);
if (list_member_oid(GetRelationPublications(ancestor),
pub->oid))
{
ancestor_published = true;
if (pub->pubviaroot)
publish_as_relid = ancestor;
}
}
}
if (list_member_oid(pubids, pub->oid) || ancestor_published)
publish = true;
}
/*
* Don't publish changes for partitioned tables, because
* publishing those of its partitions suffices, unless partition
* changes won't be published due to pubviaroot being set.
*/
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
@ -604,6 +744,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
list_free(pubids);
entry->publish_as_relid = publish_as_relid;
entry->replicate_valid = true;
}

View File

@ -44,6 +44,7 @@
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_attrdef.h"
@ -5314,6 +5315,20 @@ GetRelationPublicationActions(Relation relation)
/* Fetch the publication membership info. */
puboids = GetRelationPublications(RelationGetRelid(relation));
if (relation->rd_rel->relispartition)
{
/* Add publications that the ancestors are in too. */
List *ancestors = get_partition_ancestors(RelationGetRelid(relation));
ListCell *lc;
foreach(lc, ancestors)
{
Oid ancestor = lfirst_oid(lc);
puboids = list_concat_unique_oid(puboids,
GetRelationPublications(ancestor));
}
}
puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
foreach(lc, puboids)