1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Allow publications with schema and table of the same schema.

We previously thought that allowing such cases can confuse users when they
specify DROP TABLES IN SCHEMA but that doesn't seem to be the case based
on discussion. This helps to uplift the restriction during
ALTER TABLE ... SET SCHEMA which used to ensure that we couldn't end up
with a publication having both a schema and the same schema's table.

To allow this, we need to forbid having any schema on a publication if
column lists on a table are specified (and vice versa). This is because
otherwise we still need a restriction during ALTER TABLE ... SET SCHEMA to
forbid cases where it could lead to a publication having both a schema and
the same schema's table with column list.

Based on suggestions by Peter Eisentraut.

Author: Hou Zhijie and Vignesh C
Reviewed-By: Peter Smith, Amit Kapila
Backpatch-through: 15, where it was introduced
Discussion: https://postgr.es/m/2729c9e2-9aac-8cda-f2f4-34f2bcc18f4e@enterprisedb.com
This commit is contained in:
Amit Kapila
2022-09-23 08:08:24 +05:30
parent dd6070bc81
commit b7256753ec
13 changed files with 251 additions and 185 deletions

View File

@ -66,7 +66,6 @@ typedef struct rf_context
Oid parentid; /* relid of the parent relation */
} rf_context;
static List *OpenRelIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
static void LockSchemaList(List *schemalist);
@ -214,44 +213,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
}
}
/*
* Check if any of the given relation's schema is a member of the given schema
* list.
*/
static void
CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
PublicationObjSpecType checkobjtype)
{
ListCell *lc;
foreach(lc, rels)
{
PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
Relation rel = pub_rel->relation;
Oid relSchemaId = RelationGetNamespace(rel);
if (list_member_oid(schemaidlist, relSchemaId))
{
if (checkobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add schema \"%s\" to publication",
get_namespace_name(relSchemaId)),
errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
RelationGetRelationName(rel),
get_namespace_name(relSchemaId)));
else if (checkobjtype == PUBLICATIONOBJ_TABLE)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add relation \"%s.%s\" to publication",
get_namespace_name(relSchemaId),
RelationGetRelationName(rel)),
errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
get_namespace_name(relSchemaId)));
}
}
}
/*
* Returns true if any of the columns used in the row filter WHERE expression is
* not part of REPLICA IDENTITY, false otherwise.
@ -721,7 +682,7 @@ TransformPubWhereClauses(List *tables, const char *queryString,
*/
static void
CheckPubRelationColumnList(List *tables, const char *queryString,
bool pubviaroot)
bool publish_schema, bool pubviaroot)
{
ListCell *lc;
@ -732,6 +693,24 @@ CheckPubRelationColumnList(List *tables, const char *queryString,
if (pri->columns == NIL)
continue;
/*
* Disallow specifying column list if any schema is in the
* publication.
*
* XXX We could instead just forbid the case when the publication
* tries to publish the table with a column list and a schema for that
* table. However, if we do that then we need a restriction during
* ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
* seem to be a good idea.
*/
if (publish_schema)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use publication column list for relation \"%s.%s\"",
get_namespace_name(RelationGetNamespace(pri->relation)),
RelationGetRelationName(pri->relation)),
errdetail("Column list cannot be specified if any schema is part of the publication or specified in the list."));
/*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
@ -858,13 +837,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
List *rels;
rels = OpenTableList(relations);
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
TransformPubWhereClauses(rels, pstate->p_sourcetext,
publish_via_partition_root);
CheckPubRelationColumnList(rels, pstate->p_sourcetext,
schemaidlist != NIL,
publish_via_partition_root);
PublicationAddTables(puboid, rels, true, NULL);
@ -1110,8 +1087,8 @@ InvalidatePublicationRels(List *relids)
*/
static void
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
List *tables, List *schemaidlist,
const char *queryString)
List *tables, const char *queryString,
bool publish_schema)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
@ -1129,19 +1106,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
if (stmt->action == AP_AddObjects)
{
List *schemas = NIL;
/*
* Check if the relation is member of the existing schema in the
* publication or member of the schema list specified.
*/
schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
CheckObjSchemaNotAlreadyInPublication(rels, schemas,
PUBLICATIONOBJ_TABLE);
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
publish_schema |= is_schema_publication(pubid);
CheckPubRelationColumnList(rels, queryString, publish_schema,
pubform->pubviaroot);
PublicationAddTables(pubid, rels, false, stmt);
}
@ -1154,12 +1124,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
List *delrels = NIL;
ListCell *oldlc;
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
CheckPubRelationColumnList(rels, queryString, publish_schema,
pubform->pubviaroot);
/*
* To recreate the relation list for the publication, look for
@ -1308,16 +1276,35 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
LockSchemaList(schemaidlist);
if (stmt->action == AP_AddObjects)
{
List *rels;
ListCell *lc;
List *reloids;
reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
rels = OpenRelIdList(reloids);
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLES_IN_SCHEMA);
foreach(lc, reloids)
{
HeapTuple coltuple;
coltuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(lfirst_oid(lc)),
ObjectIdGetDatum(pubform->oid));
if (!HeapTupleIsValid(coltuple))
continue;
/*
* Disallow adding schema if column list is already part of the
* publication. See CheckPubRelationColumnList.
*/
if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add schema to the publication"),
errdetail("Schema cannot be added if any table that specifies column list is already part of the publication."));
ReleaseSysCache(coltuple);
}
CloseTableList(rels);
PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
}
else if (stmt->action == AP_DropObjects)
@ -1429,14 +1416,7 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
heap_freetuple(tup);
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add table(s) that were already going
* to become part of the publication by adding corresponding schema(s)
* via this command and similarly it will prevent the concurrent
* addition of schema(s) for which there is any corresponding table
* being added by this command.
*/
/* Lock the publication so nobody else can do anything with it. */
LockDatabaseObject(PublicationRelationId, pubid, 0,
AccessExclusiveLock);
@ -1453,8 +1433,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
errmsg("publication \"%s\" does not exist",
stmt->pubname));
AlterPublicationTables(stmt, tup, relations, schemaidlist,
pstate->p_sourcetext);
AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
schemaidlist != NIL);
AlterPublicationSchemas(stmt, tup, schemaidlist);
}
@ -1569,32 +1549,6 @@ RemovePublicationSchemaById(Oid psoid)
table_close(rel, RowExclusiveLock);
}
/*
* Open relations specified by a relid list.
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
* add them to a publication.
*/
static List *
OpenRelIdList(List *relids)
{
ListCell *lc;
List *rels = NIL;
foreach(lc, relids)
{
PublicationRelInfo *pub_rel;
Oid relid = lfirst_oid(lc);
Relation rel = table_open(relid,
ShareUpdateExclusiveLock);
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
rels = lappend(rels, pub_rel);
}
return rels;
}
/*
* Open relations specified by a PublicationTable list.
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to

View File

@ -16449,33 +16449,6 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema)
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
/*
* Check that setting the relation to a different schema won't result in a
* publication having both a schema and the same schema's table, as this
* is not supported.
*/
if (stmt->objectType == OBJECT_TABLE)
{
ListCell *lc;
List *schemaPubids = GetSchemaPublications(nspOid);
List *relPubids = GetRelationPublications(RelationGetRelid(rel));
foreach(lc, relPubids)
{
Oid pubid = lfirst_oid(lc);
if (list_member_oid(schemaPubids, pubid))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move table \"%s\" to schema \"%s\"",
RelationGetRelationName(rel), stmt->newschema),
errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".",
stmt->newschema,
RelationGetRelationName(rel),
get_publication_name(pubid, false)));
}
}
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid);