1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Invalidate all partitions for a partitioned table in publication.

Updates/Deletes on a partition were allowed even without replica identity
after the parent table was added to a publication. This would later lead
to an error on subscribers. The reason was that we were not invalidating
the partition's relcache and the publication information for partitions
was not getting rebuilt. Similarly, we were not invalidating the
partitions' relcache after dropping a partitioned table from a publication
which will prohibit Updates/Deletes on its partition without replica
identity even without any publication.

Reported-by: Haiying Tang
Author: Hou Zhijie and Vignesh C
Reviewed-by: Vignesh C and Amit Kapila
Backpatch-through: 13
Discussion: https://postgr.es/m/OS0PR01MB6113D77F583C922F1CEAA1C3FBD29@OS0PR01MB6113.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila
2021-09-22 08:00:54 +05:30
parent 5e77625b26
commit 4548c76738
6 changed files with 116 additions and 55 deletions

View File

@@ -31,6 +31,7 @@
#include "catalog/pg_publication.h" #include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/publicationcmds.h"
#include "funcapi.h" #include "funcapi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/array.h" #include "utils/array.h"
@@ -136,6 +137,42 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(result); PG_RETURN_BOOL(result);
} }
/*
* Gets the relations based on the publication partition option for a specified
* relation.
*/
List *
GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
Oid relid)
{
if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
pub_partopt != PUBLICATION_PART_ROOT)
{
List *all_parts = find_all_inheritors(relid, NoLock,
NULL);
if (pub_partopt == PUBLICATION_PART_ALL)
result = list_concat(result, all_parts);
else if (pub_partopt == PUBLICATION_PART_LEAF)
{
ListCell *lc;
foreach(lc, all_parts)
{
Oid partOid = lfirst_oid(lc);
if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
result = lappend_oid(result, partOid);
}
}
else
Assert(false);
}
else
result = lappend_oid(result, relid);
return result;
}
/* /*
* Insert new publication / relation mapping. * Insert new publication / relation mapping.
@@ -153,6 +190,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
Publication *pub = GetPublication(pubid); Publication *pub = GetPublication(pubid);
ObjectAddress myself, ObjectAddress myself,
referenced; referenced;
List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock); rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -208,8 +246,18 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
/* Close the table. */ /* Close the table. */
table_close(rel, RowExclusiveLock); table_close(rel, RowExclusiveLock);
/* Invalidate relcache so that publication info is rebuilt. */ /*
CacheInvalidateRelcache(targetrel->relation); * Invalidate relcache so that publication info is rebuilt.
*
* For the partitioned tables, we must invalidate all partitions contained
* in the respective partition hierarchies, not just the one explicitly
* mentioned in the publication. This is required because we implicitly
* publish the child tables when the parent table is published.
*/
relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
relid);
InvalidatePublicationRels(relids);
return myself; return myself;
} }
@@ -241,7 +289,7 @@ GetRelationPublications(Oid relid)
/* /*
* Gets list of relation oids for a publication. * Gets list of relation oids for a publication.
* *
* This should only be used for normal publications, the FOR ALL TABLES * This should only be used FOR TABLE publications, the FOR ALL TABLES
* should use GetAllTablesPublicationRelations(). * should use GetAllTablesPublicationRelations().
*/ */
List * List *
@@ -270,32 +318,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Form_pg_publication_rel pubrel; Form_pg_publication_rel pubrel;
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
result = GetPubPartitionOptionRelations(result, pub_partopt,
if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && pubrel->prrelid);
pub_partopt != PUBLICATION_PART_ROOT)
{
List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
NULL);
if (pub_partopt == PUBLICATION_PART_ALL)
result = list_concat(result, all_parts);
else if (pub_partopt == PUBLICATION_PART_LEAF)
{
ListCell *lc;
foreach(lc, all_parts)
{
Oid partOid = lfirst_oid(lc);
if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
result = lappend_oid(result, partOid);
}
}
else
Assert(false);
}
else
result = lappend_oid(result, pubrel->prrelid);
} }
systable_endscan(scan); systable_endscan(scan);

View File

@@ -45,9 +45,6 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/varlena.h" #include "utils/varlena.h"
/* Same as MAXNUMMESSAGES in sinvaladt.c */
#define MAX_RELCACHE_INVAL_MSGS 4096
static List *OpenTableList(List *tables); static List *OpenTableList(List *tables);
static void CloseTableList(List *rels); static void CloseTableList(List *rels);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
@@ -329,23 +326,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
List *relids = GetPublicationRelations(pubform->oid, List *relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL); PUBLICATION_PART_ALL);
/* InvalidatePublicationRels(relids);
* We don't want to send too many individual messages, at some point
* it's cheaper to just reset whole relcache.
*/
if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
{
ListCell *lc;
foreach(lc, relids)
{
Oid relid = lfirst_oid(lc);
CacheInvalidateRelcacheByRelid(relid);
}
}
else
CacheInvalidateRelcacheAll();
} }
ObjectAddressSet(obj, PublicationRelationId, pubform->oid); ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
@@ -355,6 +336,27 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0); InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
} }
/*
* Invalidate the relations.
*/
void
InvalidatePublicationRels(List *relids)
{
/*
* We don't want to send too many individual messages, at some point it's
* cheaper to just reset whole relcache.
*/
if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
{
ListCell *lc;
foreach(lc, relids)
CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
}
else
CacheInvalidateRelcacheAll();
}
/* /*
* Add or remove table to/from publication. * Add or remove table to/from publication.
*/ */
@@ -488,6 +490,7 @@ RemovePublicationRelById(Oid proid)
Relation rel; Relation rel;
HeapTuple tup; HeapTuple tup;
Form_pg_publication_rel pubrel; Form_pg_publication_rel pubrel;
List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock); rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -499,8 +502,18 @@ RemovePublicationRelById(Oid proid)
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
/* Invalidate relcache so that publication info is rebuilt. */ /*
CacheInvalidateRelcacheByRelid(pubrel->prrelid); * Invalidate relcache so that publication info is rebuilt.
*
* For the partitioned tables, we must invalidate all partitions contained
* in the respective partition hierarchies, not just the one explicitly
* mentioned in the publication. This is required because we implicitly
* publish the child tables when the parent table is published.
*/
relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
pubrel->prrelid);
InvalidatePublicationRels(relids);
CatalogTupleDelete(rel, &tup->t_self); CatalogTupleDelete(rel, &tup->t_self);

View File

@@ -111,6 +111,9 @@ typedef enum PublicationPartOpt
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern List *GetPubPartitionOptionRelations(List *result,
PublicationPartOpt pub_partopt,
Oid relid);
extern bool is_publishable_relation(Relation rel); extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,

View File

@@ -17,6 +17,10 @@
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "parser/parse_node.h" #include "parser/parse_node.h"
#include "utils/inval.h"
/* Same as MAXNUMMESSAGES in sinvaladt.c */
#define MAX_RELCACHE_INVAL_MSGS 4096
extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
@@ -25,5 +29,6 @@ extern void RemovePublicationRelById(Oid proid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
extern void InvalidatePublicationRels(List *relids);
#endif /* PUBLICATIONCMDS_H */ #endif /* PUBLICATIONCMDS_H */

View File

@@ -126,10 +126,12 @@ CREATE PUBLICATION testpub_forparted;
CREATE PUBLICATION testpub_forparted1; CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages; RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted); CREATE TABLE testpub_parted1 (LIKE testpub_parted);
CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert'); ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated -- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1; UPDATE testpub_parted1 SET a = 1;
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition -- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted \dRp+ testpub_forparted
@@ -156,7 +158,14 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
Tables: Tables:
"public.testpub_parted" "public.testpub_parted"
DROP TABLE testpub_parted1; -- still fail, because parent's publication replicates updates
UPDATE testpub_parted2 SET a = 2;
ERROR: cannot update table "testpub_parted2" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1; DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication -- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR'; SET client_min_messages = 'ERROR';

View File

@@ -76,10 +76,12 @@ CREATE PUBLICATION testpub_forparted;
CREATE PUBLICATION testpub_forparted1; CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages; RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted); CREATE TABLE testpub_parted1 (LIKE testpub_parted);
CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert'); ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated -- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1; UPDATE testpub_parted1 SET a = 1;
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition -- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted \dRp+ testpub_forparted
@@ -90,7 +92,12 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
UPDATE testpub_parted1 SET a = 1; UPDATE testpub_parted1 SET a = 1;
ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
\dRp+ testpub_forparted \dRp+ testpub_forparted
DROP TABLE testpub_parted1; -- still fail, because parent's publication replicates updates
UPDATE testpub_parted2 SET a = 2;
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1; DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication -- Test cache invalidation FOR ALL TABLES publication