mirror of
https://github.com/postgres/postgres.git
synced 2025-04-27 22:56:53 +03:00
Support adding partitioned tables to publication
When a partitioned table is added to a publication, changes of all of its partitions (current or future) are published via that publication. This change only affects which tables a publication considers as its members. The receiving side still sees the data coming from the individual leaf partitions. So existing restrictions that partition hierarchies can only be replicated one-to-one are not changed by this. Author: Amit Langote <amitlangote09@gmail.com> Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com> Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
This commit is contained in:
parent
61d7c7bce3
commit
17b9e7f9fe
@ -402,13 +402,16 @@
|
|||||||
|
|
||||||
<listitem>
|
<listitem>
|
||||||
<para>
|
<para>
|
||||||
Replication is only possible from base tables to base tables. That is,
|
Replication is only supported by tables, partitioned or not, although a
|
||||||
the tables on the publication and on the subscription side must be normal
|
given table must either be partitioned on both servers or not partitioned
|
||||||
tables, not views, materialized views, partition root tables, or foreign
|
at all. Also, when replicating between partitioned tables, the actual
|
||||||
tables. In the case of partitions, you can therefore replicate a
|
replication occurs between leaf partitions, so partitions on the two
|
||||||
partition hierarchy one-to-one, but you cannot currently replicate to a
|
servers must match one-to-one.
|
||||||
differently partitioned setup. Attempts to replicate tables other than
|
</para>
|
||||||
base tables will result in an error.
|
|
||||||
|
<para>
|
||||||
|
Attempts to replicate other types of relations such as views, materialized
|
||||||
|
views, or foreign tables, will result in an error.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</itemizedlist>
|
</itemizedlist>
|
||||||
|
@ -69,14 +69,23 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
|
|||||||
specified, the table and all its descendant tables (if any) are added.
|
specified, the table and all its descendant tables (if any) are added.
|
||||||
Optionally, <literal>*</literal> can be specified after the table name to
|
Optionally, <literal>*</literal> can be specified after the table name to
|
||||||
explicitly indicate that descendant tables are included.
|
explicitly indicate that descendant tables are included.
|
||||||
|
This does not apply to a partitioned table, however. The partitions of
|
||||||
|
a partitioned table are always implicitly considered part of the
|
||||||
|
publication, so they are never explicitly added to the publication.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
Only persistent base tables can be part of a publication. Temporary
|
Only persistent base tables and partitioned tables can be part of a
|
||||||
tables, unlogged tables, foreign tables, materialized views, regular
|
publication. Temporary tables, unlogged tables, foreign tables,
|
||||||
views, and partitioned tables cannot be part of a publication. To
|
materialized views, and regular views cannot be part of a publication.
|
||||||
replicate a partitioned table, add the individual partitions to the
|
</para>
|
||||||
publication.
|
|
||||||
|
<para>
|
||||||
|
When a partitioned table is added to a publication, all of its existing
|
||||||
|
and future partitions are implicitly considered to be part of the
|
||||||
|
publication. So, even operations that are performed directly on a
|
||||||
|
partition are also published via publications that its ancestors are
|
||||||
|
part of.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
@ -24,8 +24,10 @@
|
|||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
#include "catalog/partition.h"
|
||||||
#include "catalog/objectaccess.h"
|
#include "catalog/objectaccess.h"
|
||||||
#include "catalog/objectaddress.h"
|
#include "catalog/objectaddress.h"
|
||||||
|
#include "catalog/pg_inherits.h"
|
||||||
#include "catalog/pg_publication.h"
|
#include "catalog/pg_publication.h"
|
||||||
#include "catalog/pg_publication_rel.h"
|
#include "catalog/pg_publication_rel.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
@ -40,6 +42,8 @@
|
|||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
static List *get_rel_publications(Oid relid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if relation can be in given publication and throws appropriate
|
* Check if relation can be in given publication and throws appropriate
|
||||||
* error if not.
|
* error if not.
|
||||||
@ -47,17 +51,9 @@
|
|||||||
static void
|
static void
|
||||||
check_publication_add_relation(Relation targetrel)
|
check_publication_add_relation(Relation targetrel)
|
||||||
{
|
{
|
||||||
/* Give more specific error for partitioned tables */
|
/* Must be a regular or partitioned table */
|
||||||
if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
|
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
|
||||||
ereport(ERROR,
|
RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
|
||||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("\"%s\" is a partitioned table",
|
|
||||||
RelationGetRelationName(targetrel)),
|
|
||||||
errdetail("Adding partitioned tables to publications is not supported."),
|
|
||||||
errhint("You can add the table partitions individually.")));
|
|
||||||
|
|
||||||
/* Must be table */
|
|
||||||
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("\"%s\" is not a table",
|
errmsg("\"%s\" is not a table",
|
||||||
@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
|
|||||||
static bool
|
static bool
|
||||||
is_publishable_class(Oid relid, Form_pg_class reltuple)
|
is_publishable_class(Oid relid, Form_pg_class reltuple)
|
||||||
{
|
{
|
||||||
return reltuple->relkind == RELKIND_RELATION &&
|
return (reltuple->relkind == RELKIND_RELATION ||
|
||||||
|
reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
|
||||||
!IsCatalogRelationOid(relid) &&
|
!IsCatalogRelationOid(relid) &&
|
||||||
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
|
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
|
||||||
relid >= FirstNormalObjectId;
|
relid >= FirstNormalObjectId;
|
||||||
@ -221,10 +218,35 @@ publication_add_relation(Oid pubid, Relation targetrel,
|
|||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Gets list of publication oids for a relation oid.
|
* Gets list of publication oids for a relation, plus those of ancestors,
|
||||||
|
* if any, if the relation is a partition.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
GetRelationPublications(Oid relid)
|
GetRelationPublications(Oid relid)
|
||||||
|
{
|
||||||
|
List *result = NIL;
|
||||||
|
|
||||||
|
result = get_rel_publications(relid);
|
||||||
|
if (get_rel_relispartition(relid))
|
||||||
|
{
|
||||||
|
List *ancestors = get_partition_ancestors(relid);
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach(lc, ancestors)
|
||||||
|
{
|
||||||
|
Oid ancestor = lfirst_oid(lc);
|
||||||
|
List *ancestor_pubs = get_rel_publications(ancestor);
|
||||||
|
|
||||||
|
result = list_concat(result, ancestor_pubs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Workhorse of GetRelationPublications() */
|
||||||
|
static List *
|
||||||
|
get_rel_publications(Oid relid)
|
||||||
{
|
{
|
||||||
List *result = NIL;
|
List *result = NIL;
|
||||||
CatCList *pubrellist;
|
CatCList *pubrellist;
|
||||||
@ -253,7 +275,7 @@ GetRelationPublications(Oid relid)
|
|||||||
* should use GetAllTablesPublicationRelations().
|
* should use GetAllTablesPublicationRelations().
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
GetPublicationRelations(Oid pubid)
|
GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
|
||||||
{
|
{
|
||||||
List *result;
|
List *result;
|
||||||
Relation pubrelsrel;
|
Relation pubrelsrel;
|
||||||
@ -279,6 +301,30 @@ GetPublicationRelations(Oid pubid)
|
|||||||
|
|
||||||
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
|
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
|
||||||
|
|
||||||
|
if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
|
||||||
|
pub_partopt != PUBLICATION_PART_ROOT)
|
||||||
|
{
|
||||||
|
List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (pub_partopt == PUBLICATION_PART_ALL)
|
||||||
|
result = list_concat(result, all_parts);
|
||||||
|
else if (pub_partopt == PUBLICATION_PART_LEAF)
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach(lc, all_parts)
|
||||||
|
{
|
||||||
|
Oid partOid = lfirst_oid(lc);
|
||||||
|
|
||||||
|
if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
|
||||||
|
result = lappend_oid(result, partOid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
Assert(false);
|
||||||
|
}
|
||||||
|
else
|
||||||
result = lappend_oid(result, pubrel->prrelid);
|
result = lappend_oid(result, pubrel->prrelid);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -480,10 +526,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
|
|||||||
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||||
|
|
||||||
publication = GetPublicationByName(pubname, false);
|
publication = GetPublicationByName(pubname, false);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Publications support partitioned tables, although all changes are
|
||||||
|
* replicated using leaf partition identity and schema, so we only
|
||||||
|
* need those.
|
||||||
|
*/
|
||||||
if (publication->alltables)
|
if (publication->alltables)
|
||||||
tables = GetAllTablesPublicationRelations();
|
tables = GetAllTablesPublicationRelations();
|
||||||
else
|
else
|
||||||
tables = GetPublicationRelations(publication->oid);
|
tables = GetPublicationRelations(publication->oid,
|
||||||
|
PUBLICATION_PART_LEAF);
|
||||||
funcctx->user_fctx = (void *) tables;
|
funcctx->user_fctx = (void *) tables;
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
List *relids = GetPublicationRelations(pubform->oid);
|
/*
|
||||||
|
* For any partitioned tables contained in the publication, we must
|
||||||
|
* invalidate all partitions contained in the respective partition
|
||||||
|
* trees, not just those explicitly mentioned in the publication.
|
||||||
|
*/
|
||||||
|
List *relids = GetPublicationRelations(pubform->oid,
|
||||||
|
PUBLICATION_PART_ALL);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We don't want to send too many individual messages, at some point
|
* We don't want to send too many individual messages, at some point
|
||||||
@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
|
|||||||
PublicationDropTables(pubid, rels, false);
|
PublicationDropTables(pubid, rels, false);
|
||||||
else /* DEFELEM_SET */
|
else /* DEFELEM_SET */
|
||||||
{
|
{
|
||||||
List *oldrelids = GetPublicationRelations(pubid);
|
List *oldrelids = GetPublicationRelations(pubid,
|
||||||
|
PUBLICATION_PART_ROOT);
|
||||||
List *delrels = NIL;
|
List *delrels = NIL;
|
||||||
ListCell *oldlc;
|
ListCell *oldlc;
|
||||||
|
|
||||||
@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Open relations specified by a RangeVar list.
|
* Open relations specified by a RangeVar list.
|
||||||
* The returned tables are locked in ShareUpdateExclusiveLock mode.
|
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
|
||||||
|
* add them to a publication.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
OpenTableList(List *tables)
|
OpenTableList(List *tables)
|
||||||
@ -539,8 +547,13 @@ OpenTableList(List *tables)
|
|||||||
rels = lappend(rels, rel);
|
rels = lappend(rels, rel);
|
||||||
relids = lappend_oid(relids, myrelid);
|
relids = lappend_oid(relids, myrelid);
|
||||||
|
|
||||||
/* Add children of this rel, if requested */
|
/*
|
||||||
if (recurse)
|
* Add children of this rel, if requested, so that they too are added
|
||||||
|
* to the publication. A partitioned table can't have any inheritance
|
||||||
|
* children other than its partitions, which need not be explicitly
|
||||||
|
* added to the publication.
|
||||||
|
*/
|
||||||
|
if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
|
||||||
{
|
{
|
||||||
List *children;
|
List *children;
|
||||||
ListCell *child;
|
ListCell *child;
|
||||||
|
@ -761,6 +761,7 @@ copy_table(Relation rel)
|
|||||||
/* Map the publisher relation to local one. */
|
/* Map the publisher relation to local one. */
|
||||||
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
|
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
|
||||||
Assert(rel == relmapentry->localrel);
|
Assert(rel == relmapentry->localrel);
|
||||||
|
Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
|
||||||
|
|
||||||
/* Start copy on the publisher. */
|
/* Start copy on the publisher. */
|
||||||
initStringInfo(&cmd);
|
initStringInfo(&cmd);
|
||||||
|
@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames);
|
|||||||
static void publication_invalidation_cb(Datum arg, int cacheid,
|
static void publication_invalidation_cb(Datum arg, int cacheid,
|
||||||
uint32 hashvalue);
|
uint32 hashvalue);
|
||||||
|
|
||||||
/* Entry in the map used to remember which relation schemas we sent. */
|
/*
|
||||||
|
* Entry in the map used to remember which relation schemas we sent.
|
||||||
|
*
|
||||||
|
* For partitions, 'pubactions' considers not only the table's own
|
||||||
|
* publications, but also those of all of its ancestors.
|
||||||
|
*/
|
||||||
typedef struct RelationSyncEntry
|
typedef struct RelationSyncEntry
|
||||||
{
|
{
|
||||||
Oid relid; /* relation oid */
|
Oid relid; /* relation oid */
|
||||||
@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||||||
if (!relentry->pubactions.pubtruncate)
|
if (!relentry->pubactions.pubtruncate)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Don't send partitioned tables, because partitions should be sent
|
||||||
|
* instead.
|
||||||
|
*/
|
||||||
|
if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
||||||
|
continue;
|
||||||
|
|
||||||
relids[nrelids++] = relid;
|
relids[nrelids++] = relid;
|
||||||
maybe_send_schema(ctx, relation, relentry);
|
maybe_send_schema(ctx, relation, relentry);
|
||||||
}
|
}
|
||||||
@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Find or create entry in the relation schema cache.
|
* Find or create entry in the relation schema cache.
|
||||||
|
*
|
||||||
|
* This looks up publications that the given relation is directly or
|
||||||
|
* indirectly part of (the latter if it's really the relation's ancestor that
|
||||||
|
* is part of a publication) and fills up the found entry with the information
|
||||||
|
* about which operations to publish.
|
||||||
*/
|
*/
|
||||||
static RelationSyncEntry *
|
static RelationSyncEntry *
|
||||||
get_rel_sync_entry(PGOutputData *data, Oid relid)
|
get_rel_sync_entry(PGOutputData *data, Oid relid)
|
||||||
|
@ -3981,8 +3981,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
|
|||||||
{
|
{
|
||||||
TableInfo *tbinfo = &tblinfo[i];
|
TableInfo *tbinfo = &tblinfo[i];
|
||||||
|
|
||||||
/* Only plain tables can be aded to publications. */
|
/*
|
||||||
if (tbinfo->relkind != RELKIND_RELATION)
|
* Only regular and partitioned tables can be added to
|
||||||
|
* publications.
|
||||||
|
*/
|
||||||
|
if (tbinfo->relkind != RELKIND_RELATION &&
|
||||||
|
tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -80,7 +80,24 @@ typedef struct Publication
|
|||||||
extern Publication *GetPublication(Oid pubid);
|
extern Publication *GetPublication(Oid pubid);
|
||||||
extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
|
extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
|
||||||
extern List *GetRelationPublications(Oid relid);
|
extern List *GetRelationPublications(Oid relid);
|
||||||
extern List *GetPublicationRelations(Oid pubid);
|
|
||||||
|
/*---------
|
||||||
|
* Expected values for pub_partopt parameter of GetRelationPublications(),
|
||||||
|
* which allows callers to specify which partitions of partitioned tables
|
||||||
|
* mentioned in the publication they expect to see.
|
||||||
|
*
|
||||||
|
* ROOT: only the table explicitly mentioned in the publication
|
||||||
|
* LEAF: only leaf partitions in given tree
|
||||||
|
* ALL: all partitions in given tree
|
||||||
|
*/
|
||||||
|
typedef enum PublicationPartOpt
|
||||||
|
{
|
||||||
|
PUBLICATION_PART_ROOT,
|
||||||
|
PUBLICATION_PART_LEAF,
|
||||||
|
PUBLICATION_PART_ALL,
|
||||||
|
} PublicationPartOpt;
|
||||||
|
|
||||||
|
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
|
||||||
extern List *GetAllTablesPublications(void);
|
extern List *GetAllTablesPublications(void);
|
||||||
extern List *GetAllTablesPublicationRelations(void);
|
extern List *GetAllTablesPublicationRelations(void);
|
||||||
|
|
||||||
|
@ -116,6 +116,35 @@ Tables:
|
|||||||
|
|
||||||
DROP TABLE testpub_tbl3, testpub_tbl3a;
|
DROP TABLE testpub_tbl3, testpub_tbl3a;
|
||||||
DROP PUBLICATION testpub3, testpub4;
|
DROP PUBLICATION testpub3, testpub4;
|
||||||
|
-- Tests for partitioned tables
|
||||||
|
SET client_min_messages = 'ERROR';
|
||||||
|
CREATE PUBLICATION testpub_forparted;
|
||||||
|
CREATE PUBLICATION testpub_forparted1;
|
||||||
|
RESET client_min_messages;
|
||||||
|
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
|
||||||
|
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
|
||||||
|
-- works despite missing REPLICA IDENTITY, because updates are not replicated
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
|
||||||
|
-- only parent is listed as being in publication, not the partition
|
||||||
|
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
|
||||||
|
\dRp+ testpub_forparted
|
||||||
|
Publication testpub_forparted
|
||||||
|
Owner | All tables | Inserts | Updates | Deletes | Truncates
|
||||||
|
--------------------------+------------+---------+---------+---------+-----------
|
||||||
|
regress_publication_user | f | t | t | t | t
|
||||||
|
Tables:
|
||||||
|
"public.testpub_parted"
|
||||||
|
|
||||||
|
-- should now fail, because parent's publication replicates updates
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
ERROR: cannot update table "testpub_parted1" because it does not have a replica identity and publishes updates
|
||||||
|
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
|
||||||
|
ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
|
||||||
|
-- works again, because parent's publication is no longer considered
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
DROP TABLE testpub_parted1;
|
||||||
|
DROP PUBLICATION testpub_forparted, testpub_forparted1;
|
||||||
-- fail - view
|
-- fail - view
|
||||||
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
|
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
|
||||||
ERROR: "testpub_view" is not a table
|
ERROR: "testpub_view" is not a table
|
||||||
@ -142,11 +171,6 @@ Tables:
|
|||||||
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
|
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
|
||||||
ERROR: "testpub_view" is not a table
|
ERROR: "testpub_view" is not a table
|
||||||
DETAIL: Only tables can be added to publications.
|
DETAIL: Only tables can be added to publications.
|
||||||
-- fail - partitioned table
|
|
||||||
ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
|
|
||||||
ERROR: "testpub_parted" is a partitioned table
|
|
||||||
DETAIL: Adding partitioned tables to publications is not supported.
|
|
||||||
HINT: You can add the table partitions individually.
|
|
||||||
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
|
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
|
||||||
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
|
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
|
||||||
ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
|
ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
|
||||||
|
@ -69,6 +69,27 @@ RESET client_min_messages;
|
|||||||
DROP TABLE testpub_tbl3, testpub_tbl3a;
|
DROP TABLE testpub_tbl3, testpub_tbl3a;
|
||||||
DROP PUBLICATION testpub3, testpub4;
|
DROP PUBLICATION testpub3, testpub4;
|
||||||
|
|
||||||
|
-- Tests for partitioned tables
|
||||||
|
SET client_min_messages = 'ERROR';
|
||||||
|
CREATE PUBLICATION testpub_forparted;
|
||||||
|
CREATE PUBLICATION testpub_forparted1;
|
||||||
|
RESET client_min_messages;
|
||||||
|
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
|
||||||
|
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
|
||||||
|
-- works despite missing REPLICA IDENTITY, because updates are not replicated
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
|
||||||
|
-- only parent is listed as being in publication, not the partition
|
||||||
|
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
|
||||||
|
\dRp+ testpub_forparted
|
||||||
|
-- should now fail, because parent's publication replicates updates
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
|
||||||
|
-- works again, because parent's publication is no longer considered
|
||||||
|
UPDATE testpub_parted1 SET a = 1;
|
||||||
|
DROP TABLE testpub_parted1;
|
||||||
|
DROP PUBLICATION testpub_forparted, testpub_forparted1;
|
||||||
|
|
||||||
-- fail - view
|
-- fail - view
|
||||||
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
|
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
|
||||||
SET client_min_messages = 'ERROR';
|
SET client_min_messages = 'ERROR';
|
||||||
@ -83,8 +104,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
|
|||||||
|
|
||||||
-- fail - view
|
-- fail - view
|
||||||
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
|
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
|
||||||
-- fail - partitioned table
|
|
||||||
ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
|
|
||||||
|
|
||||||
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
|
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
|
||||||
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
|
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
|
||||||
|
178
src/test/subscription/t/013_partition.pl
Normal file
178
src/test/subscription/t/013_partition.pl
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
# Test logical replication with partitioned tables
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
use PostgresNode;
|
||||||
|
use TestLib;
|
||||||
|
use Test::More tests => 15;
|
||||||
|
|
||||||
|
# setup
|
||||||
|
|
||||||
|
my $node_publisher = get_new_node('publisher');
|
||||||
|
$node_publisher->init(allows_streaming => 'logical');
|
||||||
|
$node_publisher->start;
|
||||||
|
|
||||||
|
my $node_subscriber1 = get_new_node('subscriber1');
|
||||||
|
$node_subscriber1->init(allows_streaming => 'logical');
|
||||||
|
$node_subscriber1->start;
|
||||||
|
|
||||||
|
my $node_subscriber2 = get_new_node('subscriber2');
|
||||||
|
$node_subscriber2->init(allows_streaming => 'logical');
|
||||||
|
$node_subscriber2->start;
|
||||||
|
|
||||||
|
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||||
|
|
||||||
|
# publisher
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE PUBLICATION pub1");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE PUBLICATION pub_all FOR ALL TABLES");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_1 (b text, a int NOT NULL)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
|
||||||
|
|
||||||
|
# subscriber1
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)");
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)");
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
|
||||||
|
|
||||||
|
# subscriber 2
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)");
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all");
|
||||||
|
|
||||||
|
# Wait for initial sync of all subscriptions
|
||||||
|
my $synced_query =
|
||||||
|
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
||||||
|
$node_subscriber1->poll_query_until('postgres', $synced_query)
|
||||||
|
or die "Timed out while waiting for subscriber to synchronize data";
|
||||||
|
$node_subscriber2->poll_query_until('postgres', $synced_query)
|
||||||
|
or die "Timed out while waiting for subscriber to synchronize data";
|
||||||
|
|
||||||
|
# insert
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1 VALUES (1)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1_1 (a) VALUES (3)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1_2 VALUES (5)");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
my $result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
|
||||||
|
is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
|
||||||
|
is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
|
||||||
|
is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
|
||||||
|
|
||||||
|
# update (no partition change)
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"UPDATE tab1 SET a = 2 WHERE a = 1");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
|
||||||
|
is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
|
||||||
|
is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
|
||||||
|
|
||||||
|
# update (partition changes)
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"UPDATE tab1 SET a = 6 WHERE a = 2");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
|
||||||
|
is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
|
||||||
|
is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
|
||||||
|
is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
|
||||||
|
|
||||||
|
# delete
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"DELETE FROM tab1 WHERE a IN (3, 5)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"DELETE FROM tab1_2");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1");
|
||||||
|
is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1_1");
|
||||||
|
is($result, qq(0||), 'delete from tab1_1 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1_2");
|
||||||
|
is($result, qq(0||), 'delete from tab1_2 replicated');
|
||||||
|
|
||||||
|
# truncate
|
||||||
|
$node_subscriber1->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1 VALUES (1), (2), (5)");
|
||||||
|
$node_subscriber2->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab1_2 VALUES (2)");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"TRUNCATE tab1_2");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1");
|
||||||
|
is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1_2");
|
||||||
|
is($result, qq(0||), 'truncate of tab1_2 replicated');
|
||||||
|
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"TRUNCATE tab1");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub1');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1");
|
||||||
|
is($result, qq(0||), 'truncate of tab1_1 replicated');
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT count(*), min(a), max(a) FROM tab1");
|
||||||
|
is($result, qq(0||), 'truncate of tab1_1 replicated');
|
Loading…
x
Reference in New Issue
Block a user