|
|
|
|
@@ -25,7 +25,9 @@
|
|
|
|
|
#include "catalog/objectaddress.h"
|
|
|
|
|
#include "catalog/partition.h"
|
|
|
|
|
#include "catalog/pg_inherits.h"
|
|
|
|
|
#include "catalog/pg_namespace.h"
|
|
|
|
|
#include "catalog/pg_publication.h"
|
|
|
|
|
#include "catalog/pg_publication_namespace.h"
|
|
|
|
|
#include "catalog/pg_publication_rel.h"
|
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
|
#include "commands/dbcommands.h"
|
|
|
|
|
@@ -34,6 +36,7 @@
|
|
|
|
|
#include "commands/publicationcmds.h"
|
|
|
|
|
#include "funcapi.h"
|
|
|
|
|
#include "miscadmin.h"
|
|
|
|
|
#include "storage/lmgr.h"
|
|
|
|
|
#include "utils/acl.h"
|
|
|
|
|
#include "utils/array.h"
|
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
|
@@ -45,11 +48,16 @@
|
|
|
|
|
#include "utils/syscache.h"
|
|
|
|
|
#include "utils/varlena.h"
|
|
|
|
|
|
|
|
|
|
static List *OpenReliIdList(List *relids);
|
|
|
|
|
static List *OpenTableList(List *tables);
|
|
|
|
|
static void CloseTableList(List *rels);
|
|
|
|
|
static void LockSchemaList(List *schemalist);
|
|
|
|
|
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
|
|
|
|
|
AlterPublicationStmt *stmt);
|
|
|
|
|
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
|
|
|
|
|
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
|
|
|
|
|
AlterPublicationStmt *stmt);
|
|
|
|
|
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
parse_publication_options(ParseState *pstate,
|
|
|
|
|
@@ -135,6 +143,97 @@ parse_publication_options(ParseState *pstate,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Convert the PublicationObjSpecType list into schema oid list and
|
|
|
|
|
* PublicationTable list.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
|
|
|
|
|
List **rels, List **schemas)
|
|
|
|
|
{
|
|
|
|
|
ListCell *cell;
|
|
|
|
|
PublicationObjSpec *pubobj;
|
|
|
|
|
|
|
|
|
|
if (!pubobjspec_list)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
foreach(cell, pubobjspec_list)
|
|
|
|
|
{
|
|
|
|
|
Oid schemaid;
|
|
|
|
|
List *search_path;
|
|
|
|
|
|
|
|
|
|
pubobj = (PublicationObjSpec *) lfirst(cell);
|
|
|
|
|
|
|
|
|
|
switch (pubobj->pubobjtype)
|
|
|
|
|
{
|
|
|
|
|
case PUBLICATIONOBJ_TABLE:
|
|
|
|
|
*rels = lappend(*rels, pubobj->pubtable);
|
|
|
|
|
break;
|
|
|
|
|
case PUBLICATIONOBJ_REL_IN_SCHEMA:
|
|
|
|
|
schemaid = get_namespace_oid(pubobj->name, false);
|
|
|
|
|
|
|
|
|
|
/* Filter out duplicates if user specifies "sch1, sch1" */
|
|
|
|
|
*schemas = list_append_unique_oid(*schemas, schemaid);
|
|
|
|
|
break;
|
|
|
|
|
case PUBLICATIONOBJ_CURRSCHEMA:
|
|
|
|
|
search_path = fetch_search_path(false);
|
|
|
|
|
if (search_path == NIL) /* nothing valid in search_path? */
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_UNDEFINED_SCHEMA),
|
|
|
|
|
errmsg("no schema has been selected for CURRENT_SCHEMA"));
|
|
|
|
|
|
|
|
|
|
schemaid = linitial_oid(search_path);
|
|
|
|
|
list_free(search_path);
|
|
|
|
|
|
|
|
|
|
/* Filter out duplicates if user specifies "sch1, sch1" */
|
|
|
|
|
*schemas = list_append_unique_oid(*schemas, schemaid);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
/* shouldn't happen */
|
|
|
|
|
elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check if any of the given relation's schema is a member of the given schema
|
|
|
|
|
* list.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
|
|
|
|
|
PublicationObjSpecType checkobjtype)
|
|
|
|
|
{
|
|
|
|
|
ListCell *lc;
|
|
|
|
|
|
|
|
|
|
foreach(lc, rels)
|
|
|
|
|
{
|
|
|
|
|
PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
|
|
|
|
|
Relation rel = pub_rel->relation;
|
|
|
|
|
Oid relSchemaId = RelationGetNamespace(rel);
|
|
|
|
|
|
|
|
|
|
if (list_member_oid(schemaidlist, relSchemaId))
|
|
|
|
|
{
|
|
|
|
|
if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
|
errmsg("cannot add schema \"%s\" to publication",
|
|
|
|
|
get_namespace_name(relSchemaId)),
|
|
|
|
|
errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
|
|
|
|
|
RelationGetRelationName(rel),
|
|
|
|
|
get_namespace_name(relSchemaId)));
|
|
|
|
|
else if (checkobjtype == PUBLICATIONOBJ_TABLE)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
|
errmsg("cannot add relation \"%s.%s\" to publication",
|
|
|
|
|
get_namespace_name(relSchemaId),
|
|
|
|
|
RelationGetRelationName(rel)),
|
|
|
|
|
errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
|
|
|
|
|
get_namespace_name(relSchemaId)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Create new publication.
|
|
|
|
|
*/
|
|
|
|
|
@@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
|
|
|
|
|
bool publish_via_partition_root_given;
|
|
|
|
|
bool publish_via_partition_root;
|
|
|
|
|
AclResult aclresult;
|
|
|
|
|
List *relations = NIL;
|
|
|
|
|
List *schemaidlist = NIL;
|
|
|
|
|
|
|
|
|
|
/* must have CREATE privilege on database */
|
|
|
|
|
aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
|
|
|
|
|
@@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
|
|
|
|
|
/* Make the changes visible. */
|
|
|
|
|
CommandCounterIncrement();
|
|
|
|
|
|
|
|
|
|
if (stmt->tables)
|
|
|
|
|
{
|
|
|
|
|
List *rels;
|
|
|
|
|
|
|
|
|
|
Assert(list_length(stmt->tables) > 0);
|
|
|
|
|
|
|
|
|
|
rels = OpenTableList(stmt->tables);
|
|
|
|
|
PublicationAddTables(puboid, rels, true, NULL);
|
|
|
|
|
CloseTableList(rels);
|
|
|
|
|
}
|
|
|
|
|
else if (stmt->for_all_tables)
|
|
|
|
|
/* Associate objects with the publication. */
|
|
|
|
|
if (stmt->for_all_tables)
|
|
|
|
|
{
|
|
|
|
|
/* Invalidate relcache so that publication info is rebuilt. */
|
|
|
|
|
CacheInvalidateRelcacheAll();
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
|
|
|
|
|
&schemaidlist);
|
|
|
|
|
|
|
|
|
|
/* FOR ALL TABLES IN SCHEMA requires superuser */
|
|
|
|
|
if (list_length(schemaidlist) > 0 && !superuser())
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
|
|
|
|
errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
|
|
|
|
|
|
|
|
|
|
if (list_length(relations) > 0)
|
|
|
|
|
{
|
|
|
|
|
List *rels;
|
|
|
|
|
|
|
|
|
|
rels = OpenTableList(relations);
|
|
|
|
|
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
|
|
|
|
|
PUBLICATIONOBJ_TABLE);
|
|
|
|
|
PublicationAddTables(puboid, rels, true, NULL);
|
|
|
|
|
CloseTableList(rels);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (list_length(schemaidlist) > 0)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* Schema lock is held until the publication is created to prevent
|
|
|
|
|
* concurrent schema deletion.
|
|
|
|
|
*/
|
|
|
|
|
LockSchemaList(schemaidlist);
|
|
|
|
|
PublicationAddSchemas(puboid, schemaidlist, true, NULL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table_close(rel, RowExclusiveLock);
|
|
|
|
|
|
|
|
|
|
@@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
List *relids = NIL;
|
|
|
|
|
List *schemarelids = NIL;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* For any partitioned tables contained in the publication, we must
|
|
|
|
|
* invalidate all partitions contained in the respective partition
|
|
|
|
|
* trees, not just those explicitly mentioned in the publication.
|
|
|
|
|
*/
|
|
|
|
|
List *relids = GetPublicationRelations(pubform->oid,
|
|
|
|
|
PUBLICATION_PART_ALL);
|
|
|
|
|
relids = GetPublicationRelations(pubform->oid,
|
|
|
|
|
PUBLICATION_PART_ALL);
|
|
|
|
|
schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
|
|
|
|
|
PUBLICATION_PART_ALL);
|
|
|
|
|
relids = list_concat_unique_oid(relids, schemarelids);
|
|
|
|
|
|
|
|
|
|
InvalidatePublicationRels(relids);
|
|
|
|
|
}
|
|
|
|
|
@@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids)
|
|
|
|
|
* Add or remove table to/from publication.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
|
|
|
|
|
HeapTuple tup)
|
|
|
|
|
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
|
|
|
|
|
List *tables, List *schemaidlist)
|
|
|
|
|
{
|
|
|
|
|
List *rels = NIL;
|
|
|
|
|
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
|
|
|
|
|
Oid pubid = pubform->oid;
|
|
|
|
|
|
|
|
|
|
/* Check that user is allowed to manipulate the publication tables. */
|
|
|
|
|
if (pubform->puballtables)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
|
|
|
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
|
|
|
|
|
NameStr(pubform->pubname)),
|
|
|
|
|
errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
|
|
|
|
|
/*
|
|
|
|
|
* It is quite possible that for the SET case user has not specified any
|
|
|
|
|
* tables in which case we need to remove all the existing tables.
|
|
|
|
|
*/
|
|
|
|
|
if (!tables && stmt->action != DEFELEM_SET)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
Assert(list_length(stmt->tables) > 0);
|
|
|
|
|
rels = OpenTableList(tables);
|
|
|
|
|
|
|
|
|
|
rels = OpenTableList(stmt->tables);
|
|
|
|
|
if (stmt->action == DEFELEM_ADD)
|
|
|
|
|
{
|
|
|
|
|
List *schemas = NIL;
|
|
|
|
|
|
|
|
|
|
if (stmt->tableAction == DEFELEM_ADD)
|
|
|
|
|
/*
|
|
|
|
|
* Check if the relation is member of the existing schema in the
|
|
|
|
|
* publication or member of the schema list specified.
|
|
|
|
|
*/
|
|
|
|
|
schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
|
|
|
|
|
CheckObjSchemaNotAlreadyInPublication(rels, schemas,
|
|
|
|
|
PUBLICATIONOBJ_TABLE);
|
|
|
|
|
PublicationAddTables(pubid, rels, false, stmt);
|
|
|
|
|
else if (stmt->tableAction == DEFELEM_DROP)
|
|
|
|
|
}
|
|
|
|
|
else if (stmt->action == DEFELEM_DROP)
|
|
|
|
|
PublicationDropTables(pubid, rels, false);
|
|
|
|
|
else /* DEFELEM_SET */
|
|
|
|
|
{
|
|
|
|
|
@@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
|
|
|
|
|
List *delrels = NIL;
|
|
|
|
|
ListCell *oldlc;
|
|
|
|
|
|
|
|
|
|
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
|
|
|
|
|
PUBLICATIONOBJ_TABLE);
|
|
|
|
|
|
|
|
|
|
/* Calculate which relations to drop. */
|
|
|
|
|
foreach(oldlc, oldrelids)
|
|
|
|
|
{
|
|
|
|
|
@@ -440,11 +581,111 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
|
|
|
|
|
CloseTableList(rels);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Alter the publication schemas.
|
|
|
|
|
*
|
|
|
|
|
* Add or remove schemas to/from publication.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
AlterPublicationSchemas(AlterPublicationStmt *stmt,
|
|
|
|
|
HeapTuple tup, List *schemaidlist)
|
|
|
|
|
{
|
|
|
|
|
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It is quite possible that for the SET case user has not specified any
|
|
|
|
|
* schemas in which case we need to remove all the existing schemas.
|
|
|
|
|
*/
|
|
|
|
|
if (!schemaidlist && stmt->action != DEFELEM_SET)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Schema lock is held until the publication is altered to prevent
|
|
|
|
|
* concurrent schema deletion.
|
|
|
|
|
*/
|
|
|
|
|
LockSchemaList(schemaidlist);
|
|
|
|
|
if (stmt->action == DEFELEM_ADD)
|
|
|
|
|
{
|
|
|
|
|
List *rels;
|
|
|
|
|
List *reloids;
|
|
|
|
|
|
|
|
|
|
reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
|
|
|
|
|
rels = OpenReliIdList(reloids);
|
|
|
|
|
|
|
|
|
|
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
|
|
|
|
|
PUBLICATIONOBJ_REL_IN_SCHEMA);
|
|
|
|
|
|
|
|
|
|
CloseTableList(rels);
|
|
|
|
|
PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
|
|
|
|
|
}
|
|
|
|
|
else if (stmt->action == DEFELEM_DROP)
|
|
|
|
|
PublicationDropSchemas(pubform->oid, schemaidlist, false);
|
|
|
|
|
else /* DEFELEM_SET */
|
|
|
|
|
{
|
|
|
|
|
List *oldschemaids = GetPublicationSchemas(pubform->oid);
|
|
|
|
|
List *delschemas = NIL;
|
|
|
|
|
|
|
|
|
|
/* Identify which schemas should be dropped */
|
|
|
|
|
delschemas = list_difference_oid(oldschemaids, schemaidlist);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Schema lock is held until the publication is altered to prevent
|
|
|
|
|
* concurrent schema deletion.
|
|
|
|
|
*/
|
|
|
|
|
LockSchemaList(delschemas);
|
|
|
|
|
|
|
|
|
|
/* And drop them */
|
|
|
|
|
PublicationDropSchemas(pubform->oid, delschemas, true);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Don't bother calculating the difference for adding, we'll catch and
|
|
|
|
|
* skip existing ones when doing catalog update.
|
|
|
|
|
*/
|
|
|
|
|
PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check if relations and schemas can be in a given publication and throw
|
|
|
|
|
* appropriate error if not.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
|
|
|
|
|
List *tables, List *schemaidlist)
|
|
|
|
|
{
|
|
|
|
|
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
|
|
|
|
|
|
|
|
|
|
if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) &&
|
|
|
|
|
schemaidlist && !superuser())
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
|
|
|
|
errmsg("must be superuser to add or set schemas")));
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check that user is allowed to manipulate the publication tables in
|
|
|
|
|
* schema
|
|
|
|
|
*/
|
|
|
|
|
if (schemaidlist && pubform->puballtables)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
|
|
|
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
|
|
|
|
|
NameStr(pubform->pubname)),
|
|
|
|
|
errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
|
|
|
|
|
|
|
|
|
|
/* Check that user is allowed to manipulate the publication tables. */
|
|
|
|
|
if (tables && pubform->puballtables)
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
|
|
|
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
|
|
|
|
|
NameStr(pubform->pubname)),
|
|
|
|
|
errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Alter the existing publication.
|
|
|
|
|
*
|
|
|
|
|
* This is dispatcher function for AlterPublicationOptions and
|
|
|
|
|
* AlterPublicationTables.
|
|
|
|
|
* This is dispatcher function for AlterPublicationOptions,
|
|
|
|
|
* AlterPublicationSchemas and AlterPublicationTables.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
|
|
|
|
|
@@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
|
|
|
|
|
if (stmt->options)
|
|
|
|
|
AlterPublicationOptions(pstate, stmt, rel, tup);
|
|
|
|
|
else
|
|
|
|
|
AlterPublicationTables(stmt, rel, tup);
|
|
|
|
|
{
|
|
|
|
|
List *relations = NIL;
|
|
|
|
|
List *schemaidlist = NIL;
|
|
|
|
|
|
|
|
|
|
ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
|
|
|
|
|
&schemaidlist);
|
|
|
|
|
|
|
|
|
|
CheckAlterPublication(stmt, tup, relations, schemaidlist);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Lock the publication so nobody else can do anything with it. This
|
|
|
|
|
* prevents concurrent alter to add table(s) that were already going
|
|
|
|
|
* to become part of the publication by adding corresponding schema(s)
|
|
|
|
|
* via this command and similarly it will prevent the concurrent
|
|
|
|
|
* addition of schema(s) for which there is any corresponding table
|
|
|
|
|
* being added by this command.
|
|
|
|
|
*/
|
|
|
|
|
LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
|
|
|
|
|
AccessExclusiveLock);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It is possible that by the time we acquire the lock on publication,
|
|
|
|
|
* concurrent DDL has removed it. We can test this by checking the
|
|
|
|
|
* existence of publication.
|
|
|
|
|
*/
|
|
|
|
|
if (!SearchSysCacheExists1(PUBLICATIONOID,
|
|
|
|
|
ObjectIdGetDatum(pubform->oid)))
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
|
errmsg("publication \"%s\" does not exist",
|
|
|
|
|
stmt->pubname));
|
|
|
|
|
|
|
|
|
|
AlterPublicationTables(stmt, tup, relations, schemaidlist);
|
|
|
|
|
AlterPublicationSchemas(stmt, tup, schemaidlist);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Cleanup. */
|
|
|
|
|
heap_freetuple(tup);
|
|
|
|
|
@@ -551,10 +826,72 @@ RemovePublicationById(Oid pubid)
|
|
|
|
|
table_close(rel, RowExclusiveLock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Remove schema from publication by mapping OID.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
RemovePublicationSchemaById(Oid psoid)
|
|
|
|
|
{
|
|
|
|
|
Relation rel;
|
|
|
|
|
HeapTuple tup;
|
|
|
|
|
List *schemaRels = NIL;
|
|
|
|
|
Form_pg_publication_namespace pubsch;
|
|
|
|
|
|
|
|
|
|
rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
|
|
|
|
|
|
|
|
|
|
tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
|
|
|
|
|
|
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
|
|
|
elog(ERROR, "cache lookup failed for publication schema %u", psoid);
|
|
|
|
|
|
|
|
|
|
pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Invalidate relcache so that publication info is rebuilt. See
|
|
|
|
|
* RemovePublicationRelById for why we need to consider all the
|
|
|
|
|
* partitions.
|
|
|
|
|
*/
|
|
|
|
|
schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
|
|
|
|
|
PUBLICATION_PART_ALL);
|
|
|
|
|
InvalidatePublicationRels(schemaRels);
|
|
|
|
|
|
|
|
|
|
CatalogTupleDelete(rel, &tup->t_self);
|
|
|
|
|
|
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
|
|
|
|
|
|
table_close(rel, RowExclusiveLock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Open relations specified by a relid list.
|
|
|
|
|
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
|
|
|
|
|
* add them to a publication.
|
|
|
|
|
*/
|
|
|
|
|
static List *
|
|
|
|
|
OpenReliIdList(List *relids)
|
|
|
|
|
{
|
|
|
|
|
ListCell *lc;
|
|
|
|
|
List *rels = NIL;
|
|
|
|
|
|
|
|
|
|
foreach(lc, relids)
|
|
|
|
|
{
|
|
|
|
|
PublicationRelInfo *pub_rel;
|
|
|
|
|
Oid relid = lfirst_oid(lc);
|
|
|
|
|
Relation rel = table_open(relid,
|
|
|
|
|
ShareUpdateExclusiveLock);
|
|
|
|
|
|
|
|
|
|
pub_rel = palloc(sizeof(PublicationRelInfo));
|
|
|
|
|
pub_rel->relation = rel;
|
|
|
|
|
rels = lappend(rels, pub_rel);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rels;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Open relations specified by a PublicationTable list.
|
|
|
|
|
* In the returned list of PublicationRelInfo, tables are locked
|
|
|
|
|
* in ShareUpdateExclusiveLock mode in order to add them to a publication.
|
|
|
|
|
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
|
|
|
|
|
* add them to a publication.
|
|
|
|
|
*/
|
|
|
|
|
static List *
|
|
|
|
|
OpenTableList(List *tables)
|
|
|
|
|
@@ -658,6 +995,35 @@ CloseTableList(List *rels)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Lock the schemas specified in the schema list in AccessShareLock mode in
|
|
|
|
|
* order to prevent concurrent schema deletion.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
LockSchemaList(List *schemalist)
|
|
|
|
|
{
|
|
|
|
|
ListCell *lc;
|
|
|
|
|
|
|
|
|
|
foreach(lc, schemalist)
|
|
|
|
|
{
|
|
|
|
|
Oid schemaid = lfirst_oid(lc);
|
|
|
|
|
|
|
|
|
|
/* Allow query cancel in case this takes a long time */
|
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It is possible that by the time we acquire the lock on schema,
|
|
|
|
|
* concurrent DDL has removed it. We can test this by checking the
|
|
|
|
|
* existence of schema.
|
|
|
|
|
*/
|
|
|
|
|
if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
errcode(ERRCODE_UNDEFINED_SCHEMA),
|
|
|
|
|
errmsg("schema with OID %u does not exist", schemaid));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add listed tables to the publication.
|
|
|
|
|
*/
|
|
|
|
|
@@ -727,6 +1093,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add listed schemas to the publication.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
|
|
|
|
|
AlterPublicationStmt *stmt)
|
|
|
|
|
{
|
|
|
|
|
ListCell *lc;
|
|
|
|
|
|
|
|
|
|
Assert(!stmt || !stmt->for_all_tables);
|
|
|
|
|
|
|
|
|
|
foreach(lc, schemas)
|
|
|
|
|
{
|
|
|
|
|
Oid schemaid = lfirst_oid(lc);
|
|
|
|
|
ObjectAddress obj;
|
|
|
|
|
|
|
|
|
|
obj = publication_add_schema(pubid, schemaid, if_not_exists);
|
|
|
|
|
if (stmt)
|
|
|
|
|
{
|
|
|
|
|
EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
|
|
|
|
|
(Node *) stmt);
|
|
|
|
|
|
|
|
|
|
InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
|
|
|
|
|
obj.objectId, 0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Remove listed schemas from the publication.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
|
|
|
|
|
{
|
|
|
|
|
ObjectAddress obj;
|
|
|
|
|
ListCell *lc;
|
|
|
|
|
Oid psid;
|
|
|
|
|
|
|
|
|
|
foreach(lc, schemas)
|
|
|
|
|
{
|
|
|
|
|
Oid schemaid = lfirst_oid(lc);
|
|
|
|
|
|
|
|
|
|
psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
|
|
|
|
|
Anum_pg_publication_namespace_oid,
|
|
|
|
|
ObjectIdGetDatum(schemaid),
|
|
|
|
|
ObjectIdGetDatum(pubid));
|
|
|
|
|
if (!OidIsValid(psid))
|
|
|
|
|
{
|
|
|
|
|
if (missing_ok)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
ereport(ERROR,
|
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
|
errmsg("tables from schema \"%s\" are not part of the publication",
|
|
|
|
|
get_namespace_name(schemaid))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
|
|
|
|
|
performDeletion(&obj, DROP_CASCADE, 0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Internal workhorse for changing a publication owner
|
|
|
|
|
*/
|
|
|
|
|
|