diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index fd6910ddbea..00b648a4333 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -240,6 +240,11 @@ publications for logical replication + + pg_publication_namespace + schema to publication mapping + + pg_publication_rel relation to publication mapping @@ -6176,6 +6181,67 @@ SCRAM-SHA-256$<iteration count>:&l + + <structname>pg_publication_namespace</structname> + + + pg_publication_namespace + + + + The catalog pg_publication_namespace contains the + mapping between schemas and publications in the database. This is a + many-to-many mapping. + + + + <structname>pg_publication_namespace</structname> Columns + + + + + Column Type + + + Description + + + + + + + + oid oid + + + Row identifier + + + + + + pnpubid oid + (references pg_publication.oid) + + + Reference to publication + + + + + + pnnspid oid + (references pg_namespace.oid) + + + Reference to schema + + + + +
+
+ <structname>pg_publication_rel</structname> @@ -11278,9 +11344,9 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx information about the mapping between publications and the tables they contain. Unlike the underlying catalog pg_publication_rel, - this view expands - publications defined as FOR ALL TABLES, so for such - publications there will be a row for each eligible table. + this view expands publications defined as FOR ALL TABLES + and FOR ALL TABLES IN SCHEMA, so for such publications + there will be a row for each eligible table. diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 88646bc859d..45b2e1e28f8 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -108,9 +108,9 @@ Publications are different from schemas and do not affect how the table is accessed. Each table can be added to multiple publications if needed. - Publications may currently only contain tables. Objects must be added - explicitly, except when a publication is created for ALL - TABLES. + Publications may currently only contain tables and all tables in schema. + Objects must be added explicitly, except when a publication is created for + ALL TABLES. @@ -534,7 +534,8 @@ and TRIGGER privilege on such tables to roles that superusers trust. Moreover, if untrusted users can create tables, use only publications that list tables explicitly. That is to say, create a - subscription FOR ALL TABLES only when superusers trust + subscription FOR ALL TABLES or + FOR ALL TABLES IN SCHEMA only when superusers trust every user permitted to create a non-temp table on the publisher or the subscriber. @@ -564,8 +565,9 @@ To add tables to a publication, the user must have ownership rights on the - table. To create a publication that publishes all tables automatically, - the user must be a superuser. + table. To add all tables in schema to a publication, the user must be a + superuser. To create a publication that publishes all tables or all tables in + schema automatically, the user must be a superuser. diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index faa114b2c68..bb4ef5e5e22 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -21,12 +21,17 @@ PostgreSQL documentation -ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...] -ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [, ...] -ALTER PUBLICATION name DROP TABLE [ ONLY ] table_name [ * ] [, ...] +ALTER PUBLICATION name ADD publication_object [, ...] +ALTER PUBLICATION name SET publication_object [, ...] +ALTER PUBLICATION name DROP publication_object [, ...] ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] ) ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION name RENAME TO new_name + +where publication_object is one of: + + TABLE [ ONLY ] table_name [ * ] [, ... ] + ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -39,14 +44,15 @@ ALTER PUBLICATION name RENAME TO - The first three variants change which tables are part of the publication. - The SET TABLE clause will replace the list of tables in - the publication with the specified one. The ADD TABLE - and DROP TABLE clauses will add and remove one or more - tables from the publication. Note that adding tables to a publication that - is already subscribed to will require a ALTER SUBSCRIPTION - ... REFRESH PUBLICATION action on the subscribing side in order - to become effective. + The first three variants change which tables/schemas are part of the + publication. The SET clause will replace the list of + tables/schemas in the publication with the specified list; the existing + tables/schemas that were present in the publication will be removed. The + ADD and DROP clauses will add and + remove one or more tables/schemas from the publication. Note that adding + tables/schemas to a publication that is already subscribed to will require a + ALTER SUBSCRIPTION ... REFRESH PUBLICATION action on the + subscribing side in order to become effective. @@ -63,11 +69,22 @@ ALTER PUBLICATION name RENAME TO You must own the publication to use ALTER PUBLICATION. Adding a table to a publication additionally requires owning that table. - To alter the owner, you must also be a direct or indirect member of the new - owning role. The new owner must have CREATE privilege on - the database. Also, the new owner of a FOR ALL TABLES - publication must be a superuser. However, a superuser can change the - ownership of a publication regardless of these restrictions. + The ADD ALL TABLES IN SCHEMA and + SET ALL TABLES IN SCHEMA to a publication requires the + invoking user to be a superuser. To alter the owner, you must also be a + direct or indirect member of the new owning role. The new owner must have + CREATE privilege on the database. Also, the new owner + of a FOR ALL TABLES or FOR ALL TABLES IN + SCHEMA publication must be a superuser. However, a superuser can + change the ownership of a publication regardless of these restrictions. + + + + Adding/Setting a table that is part of schema specified in + ALL TABLES IN SCHEMA, adding/setting a schema to a + publication that already has a table that is part of specified schema or + adding/setting a table to a publication that already has a table's schema as + part of the specified schema is not supported. @@ -97,6 +114,15 @@ ALTER PUBLICATION name RENAME TO + + schema_name + + + Name of an existing schema. + + + + SET ( publication_parameter [= value] [, ... ] ) @@ -142,6 +168,25 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); ALTER PUBLICATION mypublication ADD TABLE users, departments; + + + Add schemas marketing and + sales to the publication + sales_publication: + +ALTER PUBLICATION sales_publication ADD ALL TABLES IN SCHEMA marketing, sales; + + + + + Add tables users, + departments and schema + production to the publication + production_publication: + +ALTER PUBLICATION production_publication ADD TABLE users, departments, ALL TABLES IN SCHEMA production; + + diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index ff82fbca55b..ca01d8c29bc 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,9 +22,14 @@ PostgreSQL documentation CREATE PUBLICATION name - [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] - | FOR ALL TABLES ] + [ FOR ALL TABLES + | FOR publication_object [, ... ] ] [ WITH ( publication_parameter [= value] [, ... ] ) ] + +where publication_object is one of: + + TABLE [ ONLY ] table_name [ * ] [, ... ] + ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -86,6 +91,11 @@ CREATE PUBLICATION name partition are also published via publications that its ancestors are part of. + + + Specifying a table that is part of a schema specified by + FOR ALL TABLES IN SCHEMA is not supported. + @@ -99,6 +109,37 @@ CREATE PUBLICATION name + + FOR ALL TABLES IN SCHEMA + + + Marks the publication as one that replicates changes for all tables in + the specified list of schemas, including tables created in the future. + + + + Specifying a schema along with a table which belongs to the specified + schema using FOR TABLE is not supported. + + + + Only persistent base tables and partitioned tables present in the schema + will be included as part of the publication. Temporary tables, unlogged + tables, foreign tables, materialized views, and regular views from the + schema will not be part of the publication. + + + + When a partitioned table is published via schema level publication, all + of its existing and future partitions irrespective of it being from the + publication schema or not are implicitly considered to be part of the + publication. So, even operations that are performed directly on a + partition are also published via publications that its ancestors are + part of. + + + + WITH ( publication_parameter [= value] [, ... ] ) @@ -153,9 +194,10 @@ CREATE PUBLICATION name Notes - If neither FOR TABLE nor FOR ALL - TABLES is specified, then the publication starts out with an - empty set of tables. That is useful if tables are to be added later. + If FOR TABLE, FOR ALL TABLES or + FOR ALL TABLES IN SCHEMA is not specified, then the + publication starts out with an empty set of tables. That is useful if + tables or schemas are to be added later. @@ -171,8 +213,9 @@ CREATE PUBLICATION name To add a table to a publication, the invoking user must have ownership - rights on the table. The FOR ALL TABLES clause requires - the invoking user to be a superuser. + rights on the table. The FOR ALL TABLES and + FOR ALL TABLES IN SCHEMA clauses require the invoking + user to be a superuser. @@ -222,6 +265,25 @@ CREATE PUBLICATION alltables FOR ALL TABLES; CREATE PUBLICATION insert_only FOR TABLE mydata WITH (publish = 'insert'); + + + + + Create a publication that publishes all changes for tables + users, departments and + all changes for all the tables present in the schema + production: + +CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABLES IN SCHEMA production; + + + + + Create a publication that publishes all changes for all the tables present in + the schemas marketing and + sales: + +CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales; diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index 14e0a4dbe39..48248f750e5 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -1853,8 +1853,8 @@ testdb=> If pattern is specified, only those publications whose names match the pattern are listed. - If + is appended to the command name, the tables - associated with each publication are shown as well. + If + is appended to the command name, the tables and + schemas associated with each publication are shown as well. diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e773612..4e6efda97f3 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_namespace.h \ + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154ee..ce0a4ff14e3 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3427,6 +3427,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -3566,6 +3567,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_TRANSFORM: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 91c3e976e01..9f8eb1a37fd 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -49,6 +49,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -178,6 +179,7 @@ static const Oid object_classes[] = { ExtensionRelationId, /* OCLASS_EXTENSION */ EventTriggerRelationId, /* OCLASS_EVENT_TRIGGER */ PolicyRelationId, /* OCLASS_POLICY */ + PublicationNamespaceRelationId, /* OCLASS_PUBLICATION_NAMESPACE */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ @@ -1456,6 +1458,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePolicyById(object->objectId); break; + case OCLASS_PUBLICATION_NAMESPACE: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_PUBLICATION_REL: RemovePublicationRelById(object->objectId); break; @@ -2850,6 +2856,9 @@ getObjectClass(const ObjectAddress *object) case PolicyRelationId: return OCLASS_POLICY; + case PublicationNamespaceRelationId: + return OCLASS_PUBLICATION_NAMESPACE; + case PublicationRelationId: return OCLASS_PUBLICATION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 8c94939baa8..2bae3fbb174 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -48,6 +48,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -825,6 +826,10 @@ static const struct object_type_map { "publication", OBJECT_PUBLICATION }, + /* OCLASS_PUBLICATION_NAMESPACE */ + { + "publication namespace", OBJECT_PUBLICATION_NAMESPACE + }, /* OCLASS_PUBLICATION_REL */ { "publication relation", OBJECT_PUBLICATION_REL @@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1113,6 +1120,10 @@ get_object_address(ObjectType objtype, Node *object, address = get_object_address_usermapping(castNode(List, object), missing_ok); break; + case OBJECT_PUBLICATION_NAMESPACE: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_PUBLICATION_REL: address = get_object_address_publication_rel(castNode(List, object), &relation, @@ -1935,6 +1946,49 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication schema. The first element of the + * object parameter is the schema name, the second is the publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + Publication *pub; + char *pubname; + char *schemaname; + Oid schemaid; + + ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaid = get_namespace_oid(schemaname, missing_ok); + if (!OidIsValid(schemaid)) + return address; + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2206,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2848,6 +2904,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) return tuple; } +/* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd strings which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_namespace pnform; + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + pnform = (Form_pg_publication_namespace) GETSTRUCT(tup); + *pubname = get_publication_name(pnform->pnpubid, missing_ok); + if (!(*pubname)) + { + ReleaseSysCache(tup); + return false; + } + + *nspname = get_namespace_name(pnform->pnnspid); + if (!(*nspname)) + { + Oid schemaid = pnform->pnnspid; + + pfree(*pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + schemaid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + /* * getObjectDescription: build an object description for messages * @@ -3872,6 +3977,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; @@ -4473,6 +4594,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication"); break; + case OCLASS_PUBLICATION_NAMESPACE: + appendStringInfoString(&buffer, "publication namespace"); + break; + case OCLASS_PUBLICATION_REL: appendStringInfoString(&buffer, "publication relation"); break; @@ -5683,6 +5808,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9cd0c82f93c..fed83b89a98 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,7 +28,9 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" @@ -76,6 +78,30 @@ check_publication_add_relation(Relation targetrel) errdetail("Temporary and unlogged relations cannot be replicated."))); } +/* + * Check if schema can be in given publication and throw appropriate error if + * not. + */ +static void +check_publication_add_schema(Oid schemaid) +{ + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("This operation is not supported for system schemas."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("Temporary schemas cannot be replicated."))); +} + /* * Returns if relation represented by oid and Form_pg_class entry * is publishable. @@ -105,6 +131,53 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) relid >= FirstNormalObjectId; } +/* + * Filter out the partitions whose parent tables were also specified in + * the publication. + */ +static List * +filter_partitions(List *relids, List *schemarelids) +{ + List *result = NIL; + ListCell *lc; + ListCell *lc2; + + foreach(lc, relids) + { + bool skip = false; + List *ancestors = NIL; + Oid relid = lfirst_oid(lc); + + if (get_rel_relispartition(relid)) + ancestors = get_partition_ancestors(relid); + + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + /* + * Check if the parent table exists in the published table list. + * + * XXX As of now, we do this if the partition relation or the + * partition relation's ancestor is present in schema publication + * relations. + */ + if (list_member_oid(relids, ancestor) && + (list_member_oid(schemarelids, relid) || + list_member_oid(schemarelids, ancestor))) + { + skip = true; + break; + } + } + + if (!skip) + result = lappend_oid(result, relid); + } + + return result; +} + /* * Another variant of this, taking a Relation. */ @@ -262,6 +335,89 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_namespace]; + bool nulls[Natts_pg_publication_namespace]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaid), pub->name))); + } + + check_publication_add_schema(schemaid); + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId, + Anum_pg_publication_namespace_oid); + values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_namespace_pnpubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_namespace_pnnspid - 1] = + ObjectIdGetDatum(schemaid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * publication_add_relation for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(schemaid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -428,6 +584,151 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all schemas associated with the publication */ + pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_namespace_pnpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, + PublicationNamespacePnnspidPnpubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_namespace pubsch; + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->pnnspid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + +/* + * Gets the list of publication oids associated with a specified schema. + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + +/* + * Get the list of publishable relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + Assert(OidIsValid(schemaid)); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaid); + + /* get all the relations present in the specified schema */ + scan = table_beginscan_catalog(classRel, 1, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + char relkind; + + if (!is_publishable_class(relid, relForm)) + continue; + + relkind = get_rel_relkind(relid); + if (relkind == RELKIND_RELATION) + result = lappend_oid(result, relid); + else if (relkind == RELKIND_PARTITIONED_TABLE) + { + List *partitionrels = NIL; + + /* + * It is quite possible that some of the partitions are in a + * different schema than the parent table, so we need to get such + * partitions separately. + */ + partitionrels = GetPubPartitionOptionRelations(partitionrels, + pub_partopt, + relForm->oid); + result = list_concat_unique_oid(result, partitionrels); + } + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA + * publication. + */ +List * +GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(pubid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -555,12 +856,41 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) + { tables = GetAllTablesPublicationRelations(publication->pubviaroot); + } else - tables = GetPublicationRelations(publication->oid, + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + tables = list_concat_unique_oid(relids, schemarelids); + if (schemarelids && publication->pubviaroot) + { + /* + * If the publication publishes partition changes via their + * respective root partitioned tables, we must exclude + * partitions in favor of including the root partitioned + * tables. Otherwise, the function could return both the child + * and parent tables which could cause data of the child table + * to be double-published on the subscriber side. + * + * XXX As of now, we do this when a publication has associated + * schema or for all tables publication. See + * GetAllTablesPublicationRelations(). + */ + tables = filter_partitions(tables, schemarelids); + } + } + funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c47d54e96bb..40044070cf3 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_EVENT_TRIGGER: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e9..df264329d8b 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_POLICY: case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROUTINE: case OBJECT_RULE: @@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_EXTENSION: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c7f91611dc..d1fff13d2e9 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,7 +25,9 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -34,6 +36,7 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" @@ -45,11 +48,16 @@ #include "utils/syscache.h" #include "utils/varlena.h" +static List *OpenReliIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); +static void LockSchemaList(List *schemalist); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -135,6 +143,97 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the PublicationObjSpecType list into schema oid list and + * PublicationTable list. + */ +static void +ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, + List **rels, List **schemas) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + + if (!pubobjspec_list) + return; + + foreach(cell, pubobjspec_list) + { + Oid schemaid; + List *search_path; + + pubobj = (PublicationObjSpec *) lfirst(cell); + + switch (pubobj->pubobjtype) + { + case PUBLICATIONOBJ_TABLE: + *rels = lappend(*rels, pubobj->pubtable); + break; + case PUBLICATIONOBJ_REL_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_CURRSCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + default: + /* shouldn't happen */ + elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype); + break; + } + } +} + +/* + * Check if any of the given relation's schema is a member of the given schema + * list. + */ +static void +CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, + PublicationObjSpecType checkobjtype) +{ + ListCell *lc; + + foreach(lc, rels) + { + PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); + Relation rel = pub_rel->relation; + Oid relSchemaId = RelationGetNamespace(rel); + + if (list_member_oid(schemaidlist, relSchemaId)) + { + if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); + } + } +} + /* * Create new publication. */ @@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; + List *relations = NIL; + List *schemaidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - if (stmt->tables) - { - List *rels; - - Assert(list_length(stmt->tables) > 0); - - rels = OpenTableList(stmt->tables); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); - } - else if (stmt->for_all_tables) + /* Associate objects with the publication. */ + if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } + else + { + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + /* FOR ALL TABLES IN SCHEMA requires superuser */ + if (list_length(schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); + + if (list_length(relations) > 0) + { + List *rels; + + rels = OpenTableList(relations); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); + } + + if (list_length(schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); + } + } table_close(rel, RowExclusiveLock); @@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, } else { + List *relids = NIL; + List *schemarelids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); InvalidatePublicationRels(relids); } @@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids) * Add or remove table to/from publication. */ static void -AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, - HeapTuple tup) +AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!tables && stmt->action != DEFELEM_SET) + return; - Assert(list_length(stmt->tables) > 0); + rels = OpenTableList(tables); - rels = OpenTableList(stmt->tables); + if (stmt->action == DEFELEM_ADD) + { + List *schemas = NIL; - if (stmt->tableAction == DEFELEM_ADD) + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + } + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { @@ -440,11 +581,111 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add or remove schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, + HeapTuple tup, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* + * It is quite possible that for the SET case user has not specified any + * schemas in which case we need to remove all the existing schemas. + */ + if (!schemaidlist && stmt->action != DEFELEM_SET) + return; + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + if (stmt->action == DEFELEM_ADD) + { + List *rels; + List *reloids; + + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + rels = OpenReliIdList(reloids); + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_REL_IN_SCHEMA); + + CloseTableList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + } + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaidlist); + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(delschemas); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + } +} + +/* + * Check if relations and schemas can be in a given publication and throw + * appropriate error if not. + */ +static void +CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + schemaidlist && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* + * Check that user is allowed to manipulate the publication tables in + * schema + */ + if (schemaidlist && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (tables && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); else - AlterPublicationTables(stmt, rel, tup); + { + List *relations = NIL; + List *schemaidlist = NIL; + + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + CheckAlterPublication(stmt, tup, relations, schemaidlist); + + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent alter to add table(s) that were already going + * to become part of the publication by adding corresponding schema(s) + * via this command and similarly it will prevent the concurrent + * addition of schema(s) for which there is any corresponding table + * being added by this command. + */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessExclusiveLock); + + /* + * It is possible that by the time we acquire the lock on publication, + * concurrent DDL has removed it. We can test this by checking the + * existence of publication. + */ + if (!SearchSysCacheExists1(PUBLICATIONOID, + ObjectIdGetDatum(pubform->oid))) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", + stmt->pubname)); + + AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationSchemas(stmt, tup, schemaidlist); + } /* Cleanup. */ heap_freetuple(tup); @@ -551,10 +826,72 @@ RemovePublicationById(Oid pubid) table_close(rel, RowExclusiveLock); } +/* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_namespace pubsch; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * RemovePublicationRelById for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * Open relations specified by a relid list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. + */ +static List * +OpenReliIdList(List *relids) +{ + ListCell *lc; + List *rels = NIL; + + foreach(lc, relids) + { + PublicationRelInfo *pub_rel; + Oid relid = lfirst_oid(lc); + Relation rel = table_open(relid, + ShareUpdateExclusiveLock); + + pub_rel = palloc(sizeof(PublicationRelInfo)); + pub_rel->relation = rel; + rels = lappend(rels, pub_rel); + } + + return rels; +} + /* * Open relations specified by a PublicationTable list. - * In the returned list of PublicationRelInfo, tables are locked - * in ShareUpdateExclusiveLock mode in order to add them to a publication. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -658,6 +995,35 @@ CloseTableList(List *rels) } } +/* + * Lock the schemas specified in the schema list in AccessShareLock mode in + * order to prevent concurrent schema deletion. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + + foreach(lc, schemalist) + { + Oid schemaid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock); + + /* + * It is possible that by the time we acquire the lock on schema, + * concurrent DDL has removed it. We can test this by checking the + * existence of schema. + */ + if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid))) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema with OID %u does not exist", schemaid)); + } +} + /* * Add listed tables to the publication. */ @@ -727,6 +1093,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + ObjectAddress obj; + + obj = publication_add_schema(pubid, schemaid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationNamespaceRelationId, + obj.objectId, 0); + } + } +} + +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tables from schema \"%s\" are not part of the publication", + get_namespace_name(schemaid)))); + } + + ObjectAddressSet(obj, PublicationNamespaceRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index 308e0adb553..53c18628a7d 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPERATOR: case OBJECT_OPFAMILY: case OBJECT_POLICY: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1a2f159f24e..857cc5ce6e2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EXTENSION: case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); + /* + * Check that setting the relation to a different schema won't result in a + * publication having both a schema and the same schema's table, as this + * is not supported. + */ + if (stmt->objectType == OBJECT_TABLE) + { + ListCell *lc; + List *schemaPubids = GetSchemaPublications(nspOid); + List *relPubids = GetRelationPublications(RelationGetRelid(rel)); + + foreach(lc, relPubids) + { + Oid pubid = lfirst_oid(lc); + + if (list_member_oid(schemaPubids, pubid)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move table \"%s\" to schema \"%s\"", + RelationGetRelationName(rel), stmt->newschema), + errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".", + stmt->newschema, + RelationGetRelationName(rel), + get_publication_name(pubid, false))); + } + } + /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 70e9e54d3e5..82464c98896 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4810,6 +4810,19 @@ _copyPartitionCmd(const PartitionCmd *from) return newnode; } +static PublicationObjSpec * +_copyPublicationObject(const PublicationObjSpec *from) +{ + PublicationObjSpec *newnode = makeNode(PublicationObjSpec); + + COPY_SCALAR_FIELD(pubobjtype); + COPY_STRING_FIELD(name); + COPY_NODE_FIELD(pubtable); + COPY_LOCATION_FIELD(location); + + return newnode; +} + static PublicationTable * _copyPublicationTable(const PublicationTable *from) { @@ -4827,7 +4840,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); return newnode; @@ -4840,9 +4853,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } @@ -5887,6 +5900,9 @@ copyObjectImpl(const void *from) case T_PartitionCmd: retval = _copyPartitionCmd(from); break; + case T_PublicationObjSpec: + retval = _copyPublicationObject(from); + break; case T_PublicationTable: retval = _copyPublicationTable(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 19eff201024..f537d3eb968 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2296,6 +2296,18 @@ _equalAlterTSConfigurationStmt(const AlterTSConfigurationStmt *a, return true; } +static bool +_equalPublicationObject(const PublicationObjSpec *a, + const PublicationObjSpec *b) +{ + COMPARE_SCALAR_FIELD(pubobjtype); + COMPARE_STRING_FIELD(name); + COMPARE_NODE_FIELD(pubtable); + COMPARE_LOCATION_FIELD(location); + + return true; +} + static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { @@ -2310,7 +2322,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); return true; @@ -2322,9 +2334,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } @@ -3894,6 +3906,9 @@ equal(const void *a, const void *b) case T_PartitionCmd: retval = _equalPartitionCmd(a, b); break; + case T_PublicationObjSpec: + retval = _equalPublicationObject(a, b); + break; case T_PublicationTable: retval = _equalPublicationTable(a, b); break; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 08f1bf1031c..d0eb80e69cb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -195,12 +195,17 @@ static Node *makeXmlExpr(XmlExprOp op, char *name, List *named_args, static List *mergeTableFuncParameters(List *func_args, List *columns); static TypeName *TableFuncTypeName(List *columns); static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner); +static RangeVar *makeRangeVarFromQualifiedName(char *name, List *rels, + int location, + core_yyscan_t yyscanner); static void SplitColQualList(List *qualList, List **constraintList, CollateClause **collClause, core_yyscan_t yyscanner); static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); +static void preprocess_pubobj_list(List *pubobjspec_list, + core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %} @@ -256,6 +261,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + PublicationObjSpec *publicationobjectspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -425,14 +431,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list publication_table_list + drop_option_list pub_obj_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables publication_table %type opt_fdw_options fdw_options %type fdw_option @@ -517,6 +522,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type table_ref %type joined_table %type relation_expr +%type extended_relation_expr %type relation_expr_opt_alias %type tablesample_clause opt_repeatable_clause %type target_el set_target insert_column_item @@ -553,6 +559,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by +%type PublicationObjSpec %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword @@ -9591,69 +9598,131 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] + * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] + * + * pub_obj is one of: + * + * TABLE table [, ...] + * ALL TABLES IN SCHEMA schema [, ...] * *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR pub_obj_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $6; + n->pubobjects = (List *)$5; + preprocess_pubobj_list(n->pubobjects, yyscanner); $$ = (Node *)n; } ; -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE publication_table_list +/* + * FOR TABLE and FOR ALL TABLES IN SCHEMA specifications + * + * This rule parses publication objects with and without keyword prefixes. + * + * The actual type of the object without keyword prefix depends on the previous + * one with keyword prefix. It will be preprocessed in preprocess_pubobj_list(). + * + * For the object without keyword prefix, we cannot just use relation_expr here, + * because some extended expressions in relation_expr cannot be used as a + * schemaname and we cannot differentiate it. So, we extract the rules from + * relation_expr here. + */ +PublicationObjSpec: + TABLE relation_expr { - $$ = (Node *) $3; + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; } - | FOR ALL TABLES + | ALL TABLES IN_P SCHEMA ColId { - $$ = (Node *) makeInteger(true); + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + $$->name = $5; + $$->location = @5; } - ; + | ALL TABLES IN_P SCHEMA CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + $$->location = @5; + } + | ColId + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->name = $1; + $$->location = @1; + } + | ColId indirection + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->location = @1; + } + /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ + | extended_relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $1; + } + | CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->location = @1; + } + ; -publication_table_list: - publication_table +pub_obj_list: PublicationObjSpec { $$ = list_make1($1); } - | publication_table_list ',' publication_table - { $$ = lappend($1, $3); } - ; - -publication_table: relation_expr - { - PublicationTable *n = makeNode(PublicationTable); - n->relation = $1; - $$ = (Node *) n; - } + | pub_obj_list ',' PublicationObjSpec + { $$ = lappend($1, $3); } ; /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) * - * ALTER PUBLICATION name ADD TABLE table [, table2] + * ALTER PUBLICATION name ADD pub_obj [, ...] * - * ALTER PUBLICATION name DROP TABLE table [, table2] + * ALTER PUBLICATION name DROP pub_obj [, ...] * - * ALTER PUBLICATION name SET TABLE table [, table2] + * ALTER PUBLICATION name SET pub_obj [, ...] + * + * pub_obj is one of: + * + * TABLE table_name [, ...] + * ALL TABLES IN SCHEMA schema_name [, ...] * *****************************************************************************/ @@ -9665,28 +9734,31 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE publication_table_list + | ALTER PUBLICATION name ADD_P pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE publication_table_list + | ALTER PUBLICATION name SET pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_SET; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE publication_table_list + | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -12430,7 +12502,14 @@ relation_expr: $$->inh = true; $$->alias = NULL; } - | qualified_name '*' + | extended_relation_expr + { + $$ = $1; + } + ; + +extended_relation_expr: + qualified_name '*' { /* inheritance query, explicitly */ $$ = $1; @@ -15104,28 +15183,7 @@ qualified_name: } | ColId indirection { - check_qualified_name($2, yyscanner); - $$ = makeRangeVar(NULL, NULL, @1); - switch (list_length($2)) - { - case 1: - $$->catalogname = NULL; - $$->schemaname = $1; - $$->relname = strVal(linitial($2)); - break; - case 2: - $$->catalogname = $1; - $$->schemaname = strVal(linitial($2)); - $$->relname = strVal(lsecond($2)); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("improper qualified name (too many dotted names): %s", - NameListToString(lcons(makeString($1), $2))), - parser_errposition(@1))); - break; - } + $$ = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); } ; @@ -17102,6 +17160,43 @@ makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner) return r; } +/* + * Convert a relation_name with name and namelist to a RangeVar using + * makeRangeVar. + */ +static RangeVar * +makeRangeVarFromQualifiedName(char *name, List *namelist, int location, + core_yyscan_t yyscanner) +{ + RangeVar *r; + + check_qualified_name(namelist, yyscanner); + r = makeRangeVar(NULL, NULL, location); + + switch (list_length(namelist)) + { + case 1: + r->catalogname = NULL; + r->schemaname = name; + r->relname = strVal(linitial(namelist)); + break; + case 2: + r->catalogname = name; + r->schemaname = strVal(linitial(namelist)); + r->relname = strVal(lsecond(namelist)); + break; + default: + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(lcons(makeString(name), namelist))), + parser_errposition(location)); + break; + } + + return r; +} + /* Separate Constraint nodes from COLLATE clauses in a ColQualList */ static void SplitColQualList(List *qualList, @@ -17210,6 +17305,74 @@ processCASbits(int cas_bits, int location, const char *constrType, } } +/* + * Process pubobjspec_list to check for errors in any of the objects and + * convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType. + */ +static void +preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_CONTINUATION; + + if (!pubobjspec_list) + return; + + pubobj = (PublicationObjSpec *) linitial(pubobjspec_list); + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"), + parser_errposition(pubobj->location)); + + foreach(cell, pubobjspec_list) + { + pubobj = (PublicationObjSpec *) lfirst(cell); + + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + pubobj->pubobjtype = prevobjtype; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* relation name or pubtable must be set for this type of object */ + if (!pubobj->name && !pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid table name at or near"), + parser_errposition(pubobj->location)); + else if (pubobj->name) + { + /* convert it to PublicationTable */ + PublicationTable *pubtable = makeNode(PublicationTable); + pubtable->relation = makeRangeVar(NULL, pubobj->name, + pubobj->location); + pubobj->pubtable = pubtable; + pubobj->name = NULL; + } + } + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA || + pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) + { + /* + * We can distinguish between the different type of schema + * objects based on whether name and pubtable is set. + */ + if (pubobj->name) + pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + else if (!pubobj->name && !pubobj->pubtable) + pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + else + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid schema name at or near"), + parser_errposition(pubobj->location)); + } + + prevobjtype = pubobj->pubobjtype; + } +} + /*---------- * Recursive view transformation * diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd933..6f6a203dea7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; @@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } } - if (list_member_oid(pubids, pub->oid) || ancestor_published) + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) publish = true; } @@ -1343,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } /* - * Publication relation map syscache invalidation callback + * Publication relation/schema map syscache invalidation callback */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index b54c9117669..9fa9e671a11 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5550,6 +5550,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5565,6 +5566,9 @@ GetRelationPublicationActions(Relation relation) /* Fetch the publication membership info. */ puboids = GetRelationPublications(RelationGetRelid(relation)); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5577,6 +5581,9 @@ GetRelationPublicationActions(Relation relation) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + schemaid = get_rel_namespace(ancestor); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid)); } } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8d..56870b46e45 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,7 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" @@ -617,6 +618,28 @@ static const struct cachedesc cacheinfo[] = { }, 8 }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACE */ + PublicationNamespaceObjectIndexId, + 1, + { + Anum_pg_publication_namespace_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ + PublicationNamespacePnnspidPnpubidIndexId, + 2, + { + Anum_pg_publication_namespace_pnnspid, + Anum_pg_publication_namespace_pnpubid, + 0, + 0 + }, + 64 + }, {PublicationRelationId, /* PUBLICATIONOID */ PublicationObjectIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index e38ff3c030a..ecab0a9e4ea 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -254,9 +254,12 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading publications"); (void) getPublications(fout, &numPublications); - pg_log_info("reading publication membership"); + pg_log_info("reading publication membership of tables"); getPublicationTables(fout, tblinfo, numTables); + pg_log_info("reading publication membership of schemas"); + getPublicationNamespaces(fout); + pg_log_info("reading subscriptions"); getSubscriptions(fout); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 9b0e699ce86..2c4cfb9457e 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2788,7 +2788,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH) */ if (ropt->no_publications && (strcmp(te->desc, "PUBLICATION") == 0 || - strcmp(te->desc, "PUBLICATION TABLE") == 0)) + strcmp(te->desc, "PUBLICATION TABLE") == 0 || + strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0)) return 0; /* If it's a security label, maybe ignore it */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e9f68e89867..d1842edde0d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -1875,14 +1875,15 @@ selectDumpableExtension(ExtensionInfo *extinfo, DumpOptions *dopt) } /* - * selectDumpablePublicationTable: policy-setting subroutine - * Mark a publication table as to be dumped or not + * selectDumpablePublicationObject: policy-setting subroutine + * Mark a publication object as to be dumped or not * - * Publication tables have schemas, but those are ignored in decision making, - * because publications are only dumped when we are dumping everything. + * A publication can have schemas and tables which have schemas, but those are + * ignored in decision making, because publications are only dumped when we are + * dumping everything. */ static void -selectDumpablePublicationTable(DumpableObject *dobj, Archive *fout) +selectDumpablePublicationObject(DumpableObject *dobj, Archive *fout) { if (checkExtensionMembership(dobj, fout)) return; /* extension membership overrides all else */ @@ -4126,6 +4127,93 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) free(qpubname); } +/* + * getPublicationNamespaces + * get information about publication membership for dumpable schemas. + */ +void +getPublicationNamespaces(Archive *fout) +{ + PQExpBuffer query; + PGresult *res; + PublicationSchemaInfo *pubsinfo; + DumpOptions *dopt = fout->dopt; + int i_tableoid; + int i_oid; + int i_pnpubid; + int i_pnnspid; + int i, + j, + ntups; + + if (dopt->no_publications || fout->remoteVersion < 150000) + return; + + query = createPQExpBuffer(); + + /* Collect all publication membership info. */ + appendPQExpBufferStr(query, + "SELECT tableoid, oid, pnpubid, pnnspid " + "FROM pg_catalog.pg_publication_namespace"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pnpubid = PQfnumber(res, "pnpubid"); + i_pnnspid = PQfnumber(res, "pnnspid"); + + /* this allocation may be more than we need */ + pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); + j = 0; + + for (i = 0; i < ntups; i++) + { + Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid)); + Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid)); + PublicationInfo *pubinfo; + NamespaceInfo *nspinfo; + + /* + * Ignore any entries for which we aren't interested in either the + * publication or the rel. + */ + pubinfo = findPublicationByOid(pnpubid); + if (pubinfo == NULL) + continue; + nspinfo = findNamespaceByOid(pnnspid); + if (nspinfo == NULL) + continue; + + /* + * We always dump publication namespaces unless the corresponding + * namespace is excluded from the dump. + */ + if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE) + continue; + + /* OK, make a DumpableObject for this relationship */ + pubsinfo[j].dobj.objType = DO_PUBLICATION_REL_IN_SCHEMA; + pubsinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&pubsinfo[j].dobj); + pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace; + pubsinfo[j].dobj.name = nspinfo->dobj.name; + pubsinfo[j].publication = pubinfo; + pubsinfo[j].pubschema = nspinfo; + + /* Decide whether we want to dump it */ + selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout); + + j++; + } + + PQclear(res); + destroyPQExpBuffer(query); +} + /* * getPublicationTables * get information about publication membership for dumpable tables. @@ -4204,7 +4292,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].pubtable = tbinfo; /* Decide whether we want to dump it */ - selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout); + selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); j++; } @@ -4213,6 +4301,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) destroyPQExpBuffer(query); } +/* + * dumpPublicationNamespace + * dump the definition of the given publication schema mapping. + */ +static void +dumpPublicationNamespace(Archive *fout, const PublicationSchemaInfo *pubsinfo) +{ + NamespaceInfo *schemainfo = pubsinfo->pubschema; + PublicationInfo *pubinfo = pubsinfo->publication; + PQExpBuffer query; + char *tag; + + if (!(pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + return; + + tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name)); + appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + /* + * There is no point in creating drop query as the drop is done by schema + * drop. + */ + ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = schemainfo->dobj.name, + .owner = pubinfo->rolname, + .description = "PUBLICATION TABLES IN SCHEMA", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + free(tag); + destroyPQExpBuffer(query); +} + /* * dumpPublicationTable * dump the definition of the given publication table mapping @@ -10205,6 +10331,10 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj) case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; + case DO_PUBLICATION_REL_IN_SCHEMA: + dumpPublicationNamespace(fout, + (const PublicationSchemaInfo *) dobj); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; @@ -18429,6 +18559,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_POLICY: case DO_PUBLICATION: case DO_PUBLICATION_REL: + case DO_PUBLICATION_REL_IN_SCHEMA: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index cc55e598ece..f9af14b7936 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_POLICY, DO_PUBLICATION, DO_PUBLICATION_REL, + DO_PUBLICATION_REL_IN_SCHEMA, DO_SUBSCRIPTION } DumpableObjectType; @@ -631,6 +632,17 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; } PublicationRelInfo; +/* + * The PublicationSchemaInfo struct is used to represent publication schema + * mapping. + */ +typedef struct _PublicationSchemaInfo +{ + DumpableObject dobj; + PublicationInfo *publication; + NamespaceInfo *pubschema; +} PublicationSchemaInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -725,6 +737,7 @@ extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers); extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables); extern PublicationInfo *getPublications(Archive *fout, int *numPublications); +extern void getPublicationNamespaces(Archive *fout); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); extern void getSubscriptions(Archive *fout); diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 46461fb6a18..9901d9e0ba5 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -82,6 +82,7 @@ enum dbObjectTypePriorities PRIO_POLICY, PRIO_PUBLICATION, PRIO_PUBLICATION_REL, + PRIO_PUBLICATION_REL_IN_SCHEMA, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] = PRIO_POLICY, /* DO_POLICY */ PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ + PRIO_PUBLICATION_REL_IN_SCHEMA, /* DO_PUBLICATION_REL_IN_SCHEMA */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION TABLE (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_REL_IN_SCHEMA: + snprintf(buf, bufsize, + "PUBLICATION TABLES IN SCHEMA (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 2eece79250a..d293f52b05d 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2321,6 +2321,15 @@ my %tests = ( like => { %full_runs, section_post_data => 1, }, }, + 'CREATE PUBLICATION pub3' => { + create_order => 50, + create_sql => 'CREATE PUBLICATION pub3;', + regexp => qr/^ + \QCREATE PUBLICATION pub3 WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SUBSCRIPTION sub1' => { create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub1 @@ -2357,6 +2366,27 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test' => { + create_order => 51, + create_sql => + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test;', + regexp => qr/^ + \QALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test;\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA public' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA public;', + regexp => qr/^ + \QALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA public;\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SCHEMA public' => { regexp => qr/^CREATE SCHEMA public;/m, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index ea4ca5c05c2..006661412ea 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3146,17 +3146,40 @@ describeOneTableDetails(const char *schemaname, /* print any publications */ if (pset.sversion >= 100000) { - printfPQExpBuffer(&buf, - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - "WHERE pr.prrelid = '%s'\n" - "UNION ALL\n" - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid); + if (pset.sversion >= 150000) + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" + "WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid, oid); + } + else + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid); + } result = PSQLexec(buf.data); if (!result) @@ -5020,6 +5043,8 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; + int pub_schema_tuples = 0; + char **footers = NULL; initPQExpBuffer(&buf); printfPQExpBuffer(&buf, @@ -5052,17 +5077,80 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) appendPQExpBufferStr(&buf, "ORDER BY 1;"); res = PSQLexec(buf.data); - termPQExpBuffer(&buf); if (!res) + { + termPQExpBuffer(&buf); return false; + } myopt.nullPrint = NULL; myopt.title = _("List of schemas"); myopt.translate_header = true; + if (pattern && pset.sversion >= 150000) + { + PGresult *result; + int i; + + printfPQExpBuffer(&buf, + "SELECT pubname \n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" + " JOIN pg_catalog.pg_namespace n ON n.oid = pn.pnnspid \n" + "WHERE n.nspname = '%s'\n" + "ORDER BY 1", + pattern); + result = PSQLexec(buf.data); + if (!result) + { + termPQExpBuffer(&buf); + return false; + } + else + pub_schema_tuples = PQntuples(result); + + if (pub_schema_tuples > 0) + { + /* + * Allocate memory for footers. Size of footers will be 1 (for + * storing "Publications:" string) + publication schema mapping + * count + 1 (for storing NULL). + */ + footers = (char **) pg_malloc((1 + pub_schema_tuples + 1) * sizeof(char *)); + footers[0] = pg_strdup(_("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < pub_schema_tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + footers[i + 1] = pg_strdup(buf.data); + } + + footers[i + 1] = NULL; + myopt.footers = footers; + } + + PQclear(result); + } + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); + termPQExpBuffer(&buf); PQclear(res); + + /* Free the memory allocated for the footer */ + if (footers) + { + char **footer = NULL; + + for (footer = footers; *footer; footer++) + pg_free(*footer); + + pg_free(footers); + } + return true; } @@ -6209,6 +6297,41 @@ listPublications(const char *pattern) return true; } +/* + * Add footer to publication description. + */ +static bool +addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, + bool singlecol, printTableContent *cont) +{ + PGresult *res; + int count = 0; + int i = 0; + + res = PSQLexec(buf->data); + if (!res) + return false; + else + count = PQntuples(res); + + if (count > 0) + printTableAddFooter(cont, _(footermsg)); + + for (i = 0; i < count; i++) + { + if (!singlecol) + printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + else + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + + printTableAddFooter(cont, buf->data); + } + + PQclear(res); + return true; +} + /* * \dRp+ * Describes publications including the contents. @@ -6224,6 +6347,9 @@ describePublications(const char *pattern) bool has_pubtruncate; bool has_pubviaroot; + PQExpBufferData title; + printTableContent cont; + if (pset.sversion < 100000) { char sverbuf[32]; @@ -6286,15 +6412,10 @@ describePublications(const char *pattern) const char align = 'l'; int ncols = 5; int nrows = 1; - int tables = 0; - PGresult *tabres; char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; - int j; - PQExpBufferData title; printTableOpt myopt = pset.popt.topt; - printTableContent cont; if (has_pubtruncate) ncols++; @@ -6327,6 +6448,7 @@ describePublications(const char *pattern) if (!puballtables) { + /* Get the tables for the specified publication */ printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname\n" "FROM pg_catalog.pg_class c,\n" @@ -6336,31 +6458,22 @@ describePublications(const char *pattern) " AND c.oid = pr.prrelid\n" " AND pr.prpubid = '%s'\n" "ORDER BY 1,2", pubid); + if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont)) + goto error_return; - tabres = PSQLexec(buf.data); - if (!tabres) + if (pset.sversion >= 150000) { - printTableCleanup(&cont); - PQclear(res); - termPQExpBuffer(&buf); - termPQExpBuffer(&title); - return false; + /* Get the schemas for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n\n" + " JOIN pg_catalog.pg_publication_namespace pn ON n.oid = pn.pnnspid\n" + "WHERE pn.pnpubid = '%s'\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, "Tables from schemas:", + true, &cont)) + goto error_return; } - else - tables = PQntuples(tabres); - - if (tables > 0) - printTableAddFooter(&cont, _("Tables:")); - - for (j = 0; j < tables; j++) - { - printfPQExpBuffer(&buf, " \"%s.%s\"", - PQgetvalue(tabres, j, 0), - PQgetvalue(tabres, j, 1)); - - printTableAddFooter(&cont, buf.data); - } - PQclear(tabres); } printTable(&cont, pset.queryFout, false, pset.logfile); @@ -6373,6 +6486,13 @@ describePublications(const char *pattern) PQclear(res); return true; + +error_return: + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index ecae9df8eda..8e01f545003 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1644,10 +1644,22 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION */ else if (Matches("ALTER", "PUBLICATION", MatchAny)) - COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET"); + COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); + /* ALTER PUBLICATION ADD */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) + COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + /* ALTER PUBLICATION DROP */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) + COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); /* ALTER PUBLICATION SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "TABLE"); + COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES", "IN", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " AND nspname != 'pg_catalog' " + " AND nspname not like 'pg\\_toast%%' " + " AND nspname not like 'pg\\_temp%%' " + " UNION SELECT 'CURRENT_SCHEMA'"); /* ALTER PUBLICATION SET ( */ else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); @@ -2688,17 +2700,31 @@ psql_completion(const char *text, int start, int end) /* CREATE PUBLICATION */ else if (Matches("CREATE", "PUBLICATION", MatchAny)) - COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "WITH ("); + COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR ALL TABLES IN SCHEMA", "WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR")) - COMPLETE_WITH("TABLE", "ALL TABLES"); + COMPLETE_WITH("TABLE", "ALL TABLES", "ALL TABLES IN SCHEMA"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL")) - COMPLETE_WITH("TABLES"); - else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES") - || Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny)) + COMPLETE_WITH("TABLES", "TABLES IN SCHEMA"); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES")) + COMPLETE_WITH("IN SCHEMA", "WITH ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny)) COMPLETE_WITH("WITH ("); /* Complete "CREATE PUBLICATION FOR TABLE" with "
, ..." */ else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); + + /* + * Complete "CREATE PUBLICATION FOR ALL TABLES IN SCHEMA , + * ..." + */ + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "IN", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " AND nspname != 'pg_catalog' " + " AND nspname not like 'pg\\_toast%%' " + " AND nspname not like 'pg\\_temp%%' " + " UNION SELECT 'CURRENT_SCHEMA' "); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "IN", "SCHEMA", MatchAny) && (!ends_with(prev_wd, ','))) + COMPLETE_WITH("WITH ("); /* Complete "CREATE PUBLICATION [...] WITH" */ else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("WITH", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 468491825a1..4e276ba6f4d 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202110260 +#define CATALOG_VERSION_NO 202110271 #endif diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd2..3eca295ff43 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -122,6 +122,7 @@ typedef enum ObjectClass OCLASS_EVENT_TRIGGER, /* pg_event_trigger */ OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ + OCLASS_PUBLICATION_NAMESPACE, /* pg_publication_namespace */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 82f2536c657..1ae439e6f36 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -111,6 +111,12 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetSchemaPublicationRelations(Oid schemaid, + PublicationPartOpt pub_partopt); +extern List *GetAllSchemaPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); @@ -118,6 +124,8 @@ extern List *GetPubPartitionOptionRelations(List *result, extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, + bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_namespace.h b/src/include/catalog/pg_publication_namespace.h new file mode 100644 index 00000000000..b7e16af8190 --- /dev/null +++ b/src/include/catalog/pg_publication_namespace.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_namespace.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_namespace) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_namespace.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_NAMESPACE_H +#define PG_PUBLICATION_NAMESPACE_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_namespace_d.h" + + +/* ---------------- + * pg_publication_namespace definition. cpp turns this into + * typedef struct FormData_pg_publication_namespace + * ---------------- + */ +CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) +{ + Oid oid; /* oid */ + Oid pnpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid pnnspid BKI_LOOKUP(pg_namespace); /* Oid of the schema */ +} FormData_pg_publication_namespace; + +/* ---------------- + * Form_pg_publication_namespace corresponds to a pointer to a tuple with + * the format of pg_publication_namespace relation. + * ---------------- + */ +typedef FormData_pg_publication_namespace *Form_pg_publication_namespace; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_namespace_oid_index, 8902, PublicationNamespaceObjectIndexId, on pg_publication_namespace using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 8903, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); + +#endif /* PG_PUBLICATION_NAMESPACE_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 77a299bb187..4ba68c70ee5 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -26,6 +26,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationById(Oid pubid); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 541e9861bab..7c657c12410 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -487,6 +487,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationObjSpec, T_PublicationTable, /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3138877553f..49123e28a47 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1816,6 +1816,7 @@ typedef enum ObjectType OBJECT_POLICY, OBJECT_PROCEDURE, OBJECT_PUBLICATION, + OBJECT_PUBLICATION_NAMESPACE, OBJECT_PUBLICATION_REL, OBJECT_ROLE, OBJECT_ROUTINE, @@ -3642,12 +3643,32 @@ typedef struct PublicationTable RangeVar *relation; /* relation to be published */ } PublicationTable; +/* + * Publication object type + */ +typedef enum PublicationObjSpecType +{ + PUBLICATIONOBJ_TABLE, /* Table type */ + PUBLICATIONOBJ_REL_IN_SCHEMA, /* Relations in schema type */ + PUBLICATIONOBJ_CURRSCHEMA, /* Get the first element from search_path */ + PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ +} PublicationObjSpecType; + +typedef struct PublicationObjSpec +{ + NodeTag type; + PublicationObjSpecType pubobjtype; /* type of this publication object */ + char *name; + PublicationTable *pubtable; + int location; /* token location, or -1 if unknown */ +} PublicationObjSpec; + typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3659,10 +3680,14 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... WITH */ List *options; /* List of DefElem nodes */ - /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + /* + * Parameters used for ALTER PUBLICATION ... ADD/DROP/SET publication + * objects. + */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + DefElemAction action; /* What action to perform with the + * tables/schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600c..c8cfbc30f60 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -76,6 +76,8 @@ enum SysCacheIdentifier PROCNAMEARGSNSP, PROCOID, PUBLICATIONNAME, + PUBLICATIONNAMESPACE, + PUBLICATIONNAMESPACEMAP, PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index c3b2b37067a..24d1c7cd280 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -4557,3 +4557,17 @@ DETAIL: Failing row contains (2, 1). -- ...and doesn't when the partition is detached along with its own partition alter table target_parted detach partition attach_parted; insert into attach_parted_part1 values (2, 1); +-- Test altering table having publication +create schema alter1; +create schema alter2; +create table alter1.t1 (a int); +set client_min_messages = 'ERROR'; +create publication pub1 for table alter1.t1, all tables in schema alter2; +reset client_min_messages; +alter table alter1.t1 set schema alter2; -- should fail +ERROR: cannot move table "t1" to schema "alter2" +DETAIL: The schema "alter2" and same schema's table "t1" cannot be part of the same publication "pub1". +drop publication pub1; +drop schema alter1 cascade; +NOTICE: drop cascades to table alter1.t1 +drop schema alter2 cascade; diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out index 388097a6957..a9e7f2eed56 100644 --- a/src/test/regress/expected/object_address.out +++ b/src/test/regress/expected/object_address.out @@ -45,6 +45,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; +CREATE PUBLICATION addr_pub_schema FOR ALL TABLES IN SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables @@ -427,6 +428,7 @@ WITH objects (type, name, args) AS (VALUES ('transform', '{int}', '{sql}'), ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') @@ -490,7 +492,8 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, subscription | | regress_addr_sub | regress_addr_sub | t publication | | addr_pub | addr_pub | t publication relation | | | addr_nsp.gentable in publication addr_pub | t -(49 rows) + publication namespace | | | addr_nsp in publication addr_pub_schema | t +(50 rows) --- --- Cleanup resources @@ -502,6 +505,7 @@ drop cascades to foreign table genftable drop cascades to server integer drop cascades to user mapping for regress_addr_user on server integer DROP PUBLICATION addr_pub; +DROP PUBLICATION addr_pub_schema; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cdf..215eb899be3 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -258,6 +258,8 @@ NOTICE: checking pg_transform {trftosql} => pg_proc {oid} NOTICE: checking pg_sequence {seqrelid} => pg_class {oid} NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} +NOTICE: checking pg_publication_namespace {pnpubid} => pg_publication {oid} +NOTICE: checking pg_publication_namespace {pnnspid} => pg_namespace {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 82bce9be097..0f4fe4db8f1 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -69,6 +69,78 @@ DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +-- fail - can't add schema to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables ADD ALL TABLES IN SCHEMA pub_test; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications. +-- fail - can't drop schema from 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables DROP ALL TABLES IN SCHEMA pub_test; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications. +-- fail - can't set schema to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables SET ALL TABLES IN SCHEMA pub_test; +ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES +DETAIL: Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +RESET client_min_messages; +-- should be able to add schema to 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_tbl1" +Tables from schemas: + "pub_test" + +-- should be able to drop schema from 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_tbl1" + +-- should be able to set schema to 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test" + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA pub_test; +RESET client_min_messages; +-- fail - can't create publication with schema and table of the same schema +CREATE PUBLICATION testpub_for_tbl_schema FOR ALL TABLES IN SCHEMA pub_test, TABLE pub_test.testpub_nopk; +ERROR: cannot add relation "pub_test.testpub_nopk" to publication +DETAIL: Table's schema "pub_test" is already part of the publication or part of the specified schema list. +-- fail - can't add a table of the same schema to the schema publication +ALTER PUBLICATION testpub_forschema ADD TABLE pub_test.testpub_nopk; +ERROR: cannot add relation "pub_test.testpub_nopk" to publication +DETAIL: Table's schema "pub_test" is already part of the publication or part of the specified schema list. +-- fail - can't drop a table from the schema publication which isn't in the +-- publication +ALTER PUBLICATION testpub_forschema DROP TABLE pub_test.testpub_nopk; +ERROR: relation "testpub_nopk" is not part of the publication +-- should be able to set table to schema publication +ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk; +\dRp+ testpub_forschema + Publication testpub_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "pub_test.testpub_nopk" + SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; pubname | puballtables ----------------------+-------------- @@ -94,7 +166,7 @@ Publications: (1 row) DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; @@ -270,18 +342,23 @@ GRANT CREATE ON DATABASE regression TO regress_publication_user2; SET ROLE regress_publication_user2; SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub2; -- ok +CREATE PUBLICATION testpub3 FOR ALL TABLES IN SCHEMA pub_test; -- fail +ERROR: must be superuser to create FOR ALL TABLES IN SCHEMA publication +CREATE PUBLICATION testpub3; -- ok RESET client_min_messages; ALTER PUBLICATION testpub2 ADD TABLE testpub_tbl1; -- fail ERROR: must be owner of table testpub_tbl1 +ALTER PUBLICATION testpub3 ADD ALL TABLES IN SCHEMA pub_test; -- fail +ERROR: must be superuser to add or set schemas SET ROLE regress_publication_user; GRANT regress_publication_user TO regress_publication_user2; SET ROLE regress_publication_user2; ALTER PUBLICATION testpub2 ADD TABLE testpub_tbl1; -- ok DROP PUBLICATION testpub2; +DROP PUBLICATION testpub3; SET ROLE regress_publication_user; REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; -DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default Publication testpub_default @@ -313,11 +390,452 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; testpub_default | regress_publication_user2 | f | t | t | t | f | f (1 row) +-- adding schemas and tables +CREATE SCHEMA pub_test1; +CREATE SCHEMA pub_test2; +CREATE SCHEMA pub_test3; +CREATE SCHEMA "CURRENT_SCHEMA"; +CREATE TABLE pub_test1.tbl (id int, data text); +CREATE TABLE pub_test1.tbl1 (id serial primary key, data text); +CREATE TABLE pub_test2.tbl1 (id serial primary key, data text); +CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int); +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + "pub_test3" + +-- check create publication on CURRENT_SCHEMA +CREATE PUBLICATION testpub3_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA; +CREATE PUBLICATION testpub4_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA"; +CREATE PUBLICATION testpub5_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA, "CURRENT_SCHEMA"; +CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA", CURRENT_SCHEMA; +CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"; +RESET client_min_messages; +\dRp+ testpub3_forschema + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "public" + +\dRp+ testpub4_forschema + Publication testpub4_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "CURRENT_SCHEMA" + +\dRp+ testpub5_forschema + Publication testpub5_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "CURRENT_SCHEMA" + "public" + +\dRp+ testpub6_forschema + Publication testpub6_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "CURRENT_SCHEMA" + "public" + +\dRp+ testpub_fortable + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "CURRENT_SCHEMA.CURRENT_SCHEMA" + +-- check create publication on CURRENT_SCHEMA where search_path is not set +SET SEARCH_PATH=''; +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA; +ERROR: no schema has been selected for CURRENT_SCHEMA +RESET SEARCH_PATH; +-- check create publication on CURRENT_SCHEMA where TABLE/ALL TABLES in SCHEMA +-- is not specified +CREATE PUBLICATION testpub_forschema1 FOR CURRENT_SCHEMA; +ERROR: FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s) +LINE 1: CREATE PUBLICATION testpub_forschema1 FOR CURRENT_SCHEMA; + ^ +-- check create publication on CURRENT_SCHEMA along with FOR TABLE +CREATE PUBLICATION testpub_forschema1 FOR TABLE CURRENT_SCHEMA; +ERROR: syntax error at or near "CURRENT_SCHEMA" +LINE 1: CREATE PUBLICATION testpub_forschema1 FOR TABLE CURRENT_SCHE... + ^ +-- check create publication on a schema that does not exist +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +-- check create publication on a system schema +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA pg_catalog; +ERROR: cannot add schema "pg_catalog" to publication +DETAIL: This operation is not supported for system schemas. +-- check create publication on an object which is not schema +CREATE PUBLICATION testpub1_forschema1 FOR ALL TABLES IN SCHEMA testpub_view; +ERROR: schema "testpub_view" does not exist +-- dropping the schema should reflect the change in publication +DROP SCHEMA pub_test3; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- renaming the schema should reflect the change in publication +ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1_renamed" + "pub_test2" + +ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- alter publication add schema +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- add non existent schema +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- add a schema which is already added to the publication +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1; +ERROR: schema "pub_test1" is already member of publication "testpub1_forschema" +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- alter publication drop schema +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +-- drop schema that is not present in the publication +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; +ERROR: tables from schema "pub_test2" are not part of the publication +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +-- drop a schema that does not exist in the system +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +-- drop all schemas +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +(1 row) + +-- alter publication set multiple schema +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- alter publication set non-existent schema +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + "pub_test2" + +-- alter publication set it duplicate schemas should set the schemas after +-- removing the duplicate schemas +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +-- cleanup pub_test1 schema for invalidation tests +ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; +DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; +DROP SCHEMA "CURRENT_SCHEMA" CASCADE; +NOTICE: drop cascades to table "CURRENT_SCHEMA"."CURRENT_SCHEMA" +-- verify relation cache invalidations through update statement for the +-- default REPLICA IDENTITY on the relation, if schema is part of the +-- publication then update will fail because relation's relreplident +-- option will be set, if schema is not part of the publication then update +-- will be successful. +INSERT INTO pub_test1.tbl VALUES(1, 'test'); +-- fail +UPDATE pub_test1.tbl SET id = 2; +ERROR: cannot update table "tbl" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; +-- success +UPDATE pub_test1.tbl SET id = 2; +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1; +-- fail +UPDATE pub_test1.tbl SET id = 2; +ERROR: cannot update table "tbl" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- verify invalidation of partition table having parent and child tables in +-- different schema +CREATE SCHEMA pub_testpart1; +CREATE SCHEMA pub_testpart2; +CREATE TABLE pub_testpart1.parent1 (a int) partition by list (a); +CREATE TABLE pub_testpart2.child_parent1 partition of pub_testpart1.parent1 for values in (1); +INSERT INTO pub_testpart2.child_parent1 values(1); +UPDATE pub_testpart2.child_parent1 set a = 1; +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpubpart_forschema FOR ALL TABLES IN SCHEMA pub_testpart1; +RESET client_min_messages; +-- fail +UPDATE pub_testpart1.parent1 set a = 1; +ERROR: cannot update table "child_parent1" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +UPDATE pub_testpart2.child_parent1 set a = 1; +ERROR: cannot update table "child_parent1" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +DROP PUBLICATION testpubpart_forschema; +-- verify invalidation of partition tables for schema publication that has +-- parent and child tables of different partition hierarchies +CREATE TABLE pub_testpart2.parent2 (a int) partition by list (a); +CREATE TABLE pub_testpart1.child_parent2 partition of pub_testpart2.parent2 for values in (1); +INSERT INTO pub_testpart1.child_parent2 values(1); +UPDATE pub_testpart1.child_parent2 set a = 1; +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpubpart_forschema FOR ALL TABLES IN SCHEMA pub_testpart2; +RESET client_min_messages; +-- fail +UPDATE pub_testpart2.child_parent1 set a = 1; +ERROR: cannot update table "child_parent1" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +UPDATE pub_testpart2.parent2 set a = 1; +ERROR: cannot update table "child_parent2" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +UPDATE pub_testpart1.child_parent2 set a = 1; +ERROR: cannot update table "child_parent2" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- alter publication set 'ALL TABLES IN SCHEMA' on an empty publication. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub3_forschema; +RESET client_min_messages; +\dRp+ testpub3_forschema + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +(1 row) + +ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub3_forschema + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables from schemas: + "pub_test1" + +-- create publication including both 'FOR TABLE' and 'FOR ALL TABLES IN SCHEMA' +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1, TABLE pub_test2.tbl1; +CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1; +RESET client_min_messages; +\dRp+ testpub_forschema_fortable + Publication testpub_forschema_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "pub_test2.tbl1" +Tables from schemas: + "pub_test1" + +\dRp+ testpub_fortable_forschema + Publication testpub_fortable_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "pub_test2.tbl1" +Tables from schemas: + "pub_test1" + +-- fail specifying table without any of 'FOR ALL TABLES IN SCHEMA' or +--'FOR TABLE' or 'FOR ALL TABLES' +CREATE PUBLICATION testpub_error FOR pub_test2.tbl1; +ERROR: FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s) +LINE 1: CREATE PUBLICATION testpub_error FOR pub_test2.tbl1; + ^ +DROP VIEW testpub_view; DROP PUBLICATION testpub_default; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; +DROP PUBLICATION testpub1_forschema; +DROP PUBLICATION testpub2_forschema; +DROP PUBLICATION testpub3_forschema; +DROP PUBLICATION testpub_forschema_fortable; +DROP PUBLICATION testpub_fortable_forschema; +DROP PUBLICATION testpubpart_forschema; DROP SCHEMA pub_test CASCADE; NOTICE: drop cascades to table pub_test.testpub_nopk +DROP SCHEMA pub_test1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table pub_test1.tbl +drop cascades to table pub_test1.tbl1 +DROP SCHEMA pub_test2 CASCADE; +NOTICE: drop cascades to table pub_test2.tbl1 +DROP SCHEMA pub_testpart1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table pub_testpart1.parent1 +drop cascades to table pub_testpart1.child_parent2 +DROP SCHEMA pub_testpart2 CASCADE; +NOTICE: drop cascades to table pub_testpart2.parent2 +-- Test the list of partitions published with or without +-- 'PUBLISH_VIA_PARTITION_ROOT' parameter +SET client_min_messages = 'ERROR'; +CREATE SCHEMA sch1; +CREATE SCHEMA sch2; +CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); +CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +-- Schema publication that does not include the schema that has the parent table +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+------------ + pub | sch2 | tbl1_part1 +(1 row) + +DROP PUBLICATION pub; +-- Table publication that does not include the parent table +CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+------------ + pub | sch2 | tbl1_part1 +(1 row) + +DROP PUBLICATION pub; +-- Schema publication that does not include the schema that has the parent table +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+------------ + pub | sch2 | tbl1_part1 +(1 row) + +DROP PUBLICATION pub; +-- Table publication that does not include the parent table +CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+------------ + pub | sch2 | tbl1_part1 +(1 row) + +DROP PUBLICATION pub; +DROP TABLE sch2.tbl1_part1; +DROP TABLE sch1.tbl1; +CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); +CREATE TABLE sch1.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +CREATE TABLE sch1.tbl1_part2 PARTITION OF sch1.tbl1 FOR VALUES FROM (10) to (20); +CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a); +ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30); +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +---------+------------+----------- + pub | sch1 | tbl1 +(1 row) + +RESET client_min_messages; +DROP PUBLICATION pub; +DROP TABLE sch1.tbl1; +DROP SCHEMA sch1 cascade; +DROP SCHEMA sch2 cascade; RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; DROP ROLE regress_publication_user_dummy; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff539..d04dc66db9e 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -140,6 +140,7 @@ pg_partitioned_table|t pg_policy|t pg_proc|t pg_publication|t +pg_publication_namespace|t pg_publication_rel|t pg_range|t pg_replication_origin|t diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql index 4ddbd16a4e7..5fac2585d90 100644 --- a/src/test/regress/sql/alter_table.sql +++ b/src/test/regress/sql/alter_table.sql @@ -2990,3 +2990,15 @@ insert into attach_parted_part1 values (2, 1); -- ...and doesn't when the partition is detached along with its own partition alter table target_parted detach partition attach_parted; insert into attach_parted_part1 values (2, 1); + +-- Test altering table having publication +create schema alter1; +create schema alter2; +create table alter1.t1 (a int); +set client_min_messages = 'ERROR'; +create publication pub1 for table alter1.t1, all tables in schema alter2; +reset client_min_messages; +alter table alter1.t1 set schema alter2; -- should fail +drop publication pub1; +drop schema alter1 cascade; +drop schema alter2 cascade; diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql index 2f4f66e3e12..2f40156eb48 100644 --- a/src/test/regress/sql/object_address.sql +++ b/src/test/regress/sql/object_address.sql @@ -48,6 +48,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; +CREATE PUBLICATION addr_pub_schema FOR ALL TABLES IN SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM addr_nsp.gentable; @@ -197,6 +198,7 @@ WITH objects (type, name, args) AS (VALUES ('transform', '{int}', '{sql}'), ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), + ('publication namespace', '{addr_nsp}', '{addr_pub_schema}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') @@ -215,6 +217,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, --- DROP FOREIGN DATA WRAPPER addr_fdw CASCADE; DROP PUBLICATION addr_pub; +DROP PUBLICATION addr_pub_schema; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index e5745d575b0..85a5302a746 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -51,12 +51,46 @@ ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; +-- fail - can't add schema to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables ADD ALL TABLES IN SCHEMA pub_test; +-- fail - can't drop schema from 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables DROP ALL TABLES IN SCHEMA pub_test; +-- fail - can't set schema to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_foralltables SET ALL TABLES IN SCHEMA pub_test; + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +RESET client_min_messages; +-- should be able to add schema to 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable +-- should be able to drop schema from 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable +-- should be able to set schema to 'FOR TABLE' publication +ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test; +\dRp+ testpub_fortable + +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA pub_test; +RESET client_min_messages; +-- fail - can't create publication with schema and table of the same schema +CREATE PUBLICATION testpub_for_tbl_schema FOR ALL TABLES IN SCHEMA pub_test, TABLE pub_test.testpub_nopk; +-- fail - can't add a table of the same schema to the schema publication +ALTER PUBLICATION testpub_forschema ADD TABLE pub_test.testpub_nopk; +-- fail - can't drop a table from the schema publication which isn't in the +-- publication +ALTER PUBLICATION testpub_forschema DROP TABLE pub_test.testpub_nopk; +-- should be able to set table to schema publication +ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk; +\dRp+ testpub_forschema + SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; \d+ testpub_tbl2 \dRp+ testpub_foralltables DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); @@ -154,9 +188,12 @@ GRANT CREATE ON DATABASE regression TO regress_publication_user2; SET ROLE regress_publication_user2; SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub2; -- ok +CREATE PUBLICATION testpub3 FOR ALL TABLES IN SCHEMA pub_test; -- fail +CREATE PUBLICATION testpub3; -- ok RESET client_min_messages; ALTER PUBLICATION testpub2 ADD TABLE testpub_tbl1; -- fail +ALTER PUBLICATION testpub3 ADD ALL TABLES IN SCHEMA pub_test; -- fail SET ROLE regress_publication_user; GRANT regress_publication_user TO regress_publication_user2; @@ -164,12 +201,12 @@ SET ROLE regress_publication_user2; ALTER PUBLICATION testpub2 ADD TABLE testpub_tbl1; -- ok DROP PUBLICATION testpub2; +DROP PUBLICATION testpub3; SET ROLE regress_publication_user; REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; -DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default @@ -190,11 +227,251 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default +-- adding schemas and tables +CREATE SCHEMA pub_test1; +CREATE SCHEMA pub_test2; +CREATE SCHEMA pub_test3; +CREATE SCHEMA "CURRENT_SCHEMA"; +CREATE TABLE pub_test1.tbl (id int, data text); +CREATE TABLE pub_test1.tbl1 (id serial primary key, data text); +CREATE TABLE pub_test2.tbl1 (id serial primary key, data text); +CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int); + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub1_forschema + +CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3; +\dRp+ testpub2_forschema + +-- check create publication on CURRENT_SCHEMA +CREATE PUBLICATION testpub3_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA; +CREATE PUBLICATION testpub4_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA"; +CREATE PUBLICATION testpub5_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA, "CURRENT_SCHEMA"; +CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA", CURRENT_SCHEMA; +CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"; + +RESET client_min_messages; + +\dRp+ testpub3_forschema +\dRp+ testpub4_forschema +\dRp+ testpub5_forschema +\dRp+ testpub6_forschema +\dRp+ testpub_fortable + +-- check create publication on CURRENT_SCHEMA where search_path is not set +SET SEARCH_PATH=''; +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA CURRENT_SCHEMA; +RESET SEARCH_PATH; + +-- check create publication on CURRENT_SCHEMA where TABLE/ALL TABLES in SCHEMA +-- is not specified +CREATE PUBLICATION testpub_forschema1 FOR CURRENT_SCHEMA; + +-- check create publication on CURRENT_SCHEMA along with FOR TABLE +CREATE PUBLICATION testpub_forschema1 FOR TABLE CURRENT_SCHEMA; + +-- check create publication on a schema that does not exist +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA non_existent_schema; + +-- check create publication on a system schema +CREATE PUBLICATION testpub_forschema FOR ALL TABLES IN SCHEMA pg_catalog; + +-- check create publication on an object which is not schema +CREATE PUBLICATION testpub1_forschema1 FOR ALL TABLES IN SCHEMA testpub_view; + +-- dropping the schema should reflect the change in publication +DROP SCHEMA pub_test3; +\dRp+ testpub2_forschema + +-- renaming the schema should reflect the change in publication +ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; +\dRp+ testpub2_forschema + +ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; +\dRp+ testpub2_forschema + +-- alter publication add schema +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- add non existent schema +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- add a schema which is already added to the publication +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub1_forschema + +-- alter publication drop schema +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- drop schema that is not present in the publication +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- drop a schema that does not exist in the system +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- drop all schemas +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub1_forschema + +-- alter publication set multiple schema +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + +-- alter publication set non-existent schema +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- alter publication set it duplicate schemas should set the schemas after +-- removing the duplicate schemas +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; +\dRp+ testpub1_forschema + +-- cleanup pub_test1 schema for invalidation tests +ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; +DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; +DROP SCHEMA "CURRENT_SCHEMA" CASCADE; + +-- verify relation cache invalidations through update statement for the +-- default REPLICA IDENTITY on the relation, if schema is part of the +-- publication then update will fail because relation's relreplident +-- option will be set, if schema is not part of the publication then update +-- will be successful. +INSERT INTO pub_test1.tbl VALUES(1, 'test'); + +-- fail +UPDATE pub_test1.tbl SET id = 2; +ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; + +-- success +UPDATE pub_test1.tbl SET id = 2; +ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1; + +-- fail +UPDATE pub_test1.tbl SET id = 2; + +-- verify invalidation of partition table having parent and child tables in +-- different schema +CREATE SCHEMA pub_testpart1; +CREATE SCHEMA pub_testpart2; + +CREATE TABLE pub_testpart1.parent1 (a int) partition by list (a); +CREATE TABLE pub_testpart2.child_parent1 partition of pub_testpart1.parent1 for values in (1); +INSERT INTO pub_testpart2.child_parent1 values(1); +UPDATE pub_testpart2.child_parent1 set a = 1; +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpubpart_forschema FOR ALL TABLES IN SCHEMA pub_testpart1; +RESET client_min_messages; + +-- fail +UPDATE pub_testpart1.parent1 set a = 1; +UPDATE pub_testpart2.child_parent1 set a = 1; + +DROP PUBLICATION testpubpart_forschema; + +-- verify invalidation of partition tables for schema publication that has +-- parent and child tables of different partition hierarchies +CREATE TABLE pub_testpart2.parent2 (a int) partition by list (a); +CREATE TABLE pub_testpart1.child_parent2 partition of pub_testpart2.parent2 for values in (1); +INSERT INTO pub_testpart1.child_parent2 values(1); +UPDATE pub_testpart1.child_parent2 set a = 1; +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpubpart_forschema FOR ALL TABLES IN SCHEMA pub_testpart2; +RESET client_min_messages; + +-- fail +UPDATE pub_testpart2.child_parent1 set a = 1; +UPDATE pub_testpart2.parent2 set a = 1; +UPDATE pub_testpart1.child_parent2 set a = 1; + +-- alter publication set 'ALL TABLES IN SCHEMA' on an empty publication. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub3_forschema; +RESET client_min_messages; +\dRp+ testpub3_forschema +ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1; +\dRp+ testpub3_forschema + +-- create publication including both 'FOR TABLE' and 'FOR ALL TABLES IN SCHEMA' +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1, TABLE pub_test2.tbl1; +CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1; +RESET client_min_messages; + +\dRp+ testpub_forschema_fortable +\dRp+ testpub_fortable_forschema + +-- fail specifying table without any of 'FOR ALL TABLES IN SCHEMA' or +--'FOR TABLE' or 'FOR ALL TABLES' +CREATE PUBLICATION testpub_error FOR pub_test2.tbl1; + +DROP VIEW testpub_view; + DROP PUBLICATION testpub_default; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; +DROP PUBLICATION testpub1_forschema; +DROP PUBLICATION testpub2_forschema; +DROP PUBLICATION testpub3_forschema; +DROP PUBLICATION testpub_forschema_fortable; +DROP PUBLICATION testpub_fortable_forschema; +DROP PUBLICATION testpubpart_forschema; DROP SCHEMA pub_test CASCADE; +DROP SCHEMA pub_test1 CASCADE; +DROP SCHEMA pub_test2 CASCADE; +DROP SCHEMA pub_testpart1 CASCADE; +DROP SCHEMA pub_testpart2 CASCADE; + +-- Test the list of partitions published with or without +-- 'PUBLISH_VIA_PARTITION_ROOT' parameter +SET client_min_messages = 'ERROR'; +CREATE SCHEMA sch1; +CREATE SCHEMA sch2; +CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); +CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +-- Schema publication that does not include the schema that has the parent table +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + +DROP PUBLICATION pub; +-- Table publication that does not include the parent table +CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + +DROP PUBLICATION pub; +-- Schema publication that does not include the schema that has the parent table +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +SELECT * FROM pg_publication_tables; + +DROP PUBLICATION pub; +-- Table publication that does not include the parent table +CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0); +SELECT * FROM pg_publication_tables; + +DROP PUBLICATION pub; +DROP TABLE sch2.tbl1_part1; +DROP TABLE sch1.tbl1; + +CREATE TABLE sch1.tbl1 (a int) PARTITION BY RANGE(a); +CREATE TABLE sch1.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10); +CREATE TABLE sch1.tbl1_part2 PARTITION OF sch1.tbl1 FOR VALUES FROM (10) to (20); +CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a); +ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30); +CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1); +SELECT * FROM pg_publication_tables; + +RESET client_min_messages; +DROP PUBLICATION pub; +DROP TABLE sch1.tbl1; +DROP SCHEMA sch1 cascade; +DROP SCHEMA sch2 cascade; RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 40fbcddd201..7bbbb34e2fa 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -778,6 +778,7 @@ FormData_pg_partitioned_table FormData_pg_policy FormData_pg_proc FormData_pg_publication +FormData_pg_publication_namespace FormData_pg_publication_rel FormData_pg_range FormData_pg_replication_origin @@ -834,6 +835,7 @@ Form_pg_partitioned_table Form_pg_policy Form_pg_proc Form_pg_publication +Form_pg_publication_namespace Form_pg_publication_rel Form_pg_range Form_pg_replication_origin @@ -2047,8 +2049,11 @@ PsqlSettings Publication PublicationActions PublicationInfo +PublicationObjSpec +PublicationObjSpecType PublicationPartOpt PublicationRelInfo +PublicationSchemaInfo PublicationTable PullFilter PullFilterOps