1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-18 02:02:55 +03:00

Add "ALL SEQUENCES" support to publications.

This patch adds support for the ALL SEQUENCES clause in publications,
enabling synchronization/replication of all sequences that is useful for
upgrades.

Publications can now include all sequences via FOR ALL SEQUENCES.
psql enhancements:
\d shows publications for a given sequence.
\dRp indicates if a publication includes all sequences.

ALL SEQUENCES can be combined with ALL TABLES, but not with other options
like TABLE or TABLES IN SCHEMA. We can extend support for more granular
clauses in future.

The view pg_publication_sequences provides information about the mapping
between publications and sequences.

This patch enables publishing of sequences; subscriber-side support will
be added in upcoming patches.

Author: vignesh C <vignesh21@gmail.com>
Author: Tomas Vondra <tomas@vondra.me>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-10-09 03:48:54 +00:00
parent ef5e60a9d3
commit 96b3784973
23 changed files with 929 additions and 354 deletions

View File

@@ -115,8 +115,10 @@ check_publication_add_schema(Oid schemaid)
* Returns if relation represented by oid and Form_pg_class entry
* is publishable.
*
* Does same checks as check_publication_add_relation() above, but does not
* need relation to be opened and also does not throw errors.
* Does same checks as check_publication_add_relation() above except for
* RELKIND_SEQUENCE, but does not need relation to be opened and also does
* not throw errors. Here, the additional check is to support ALL SEQUENCES
* publication.
*
* XXX This also excludes all tables with relid < FirstNormalObjectId,
* ie all tables created during initdb. This mainly affects the preinstalled
@@ -134,7 +136,8 @@ static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
return (reltuple->relkind == RELKIND_RELATION ||
reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -773,8 +776,8 @@ GetRelationPublications(Oid relid)
/*
* Gets list of relation oids for a publication.
*
* This should only be used FOR TABLE publications, the FOR ALL TABLES
* should use GetAllTablesPublicationRelations().
* This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
* should use GetAllPublicationRelations().
*/
List *
GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
@@ -854,14 +857,16 @@ GetAllTablesPublications(void)
}
/*
* Gets list of all relation published by FOR ALL TABLES publication(s).
* Gets list of all relations published by FOR ALL TABLES/SEQUENCES
* publication(s).
*
* If the publication publishes partition changes via their respective root
* partitioned tables, we must exclude partitions in favor of including the
* root partitioned tables.
* root partitioned tables. This is not applicable to FOR ALL SEQUENCES
* publication.
*/
List *
GetAllTablesPublicationRelations(bool pubviaroot)
GetAllPublicationRelations(char relkind, bool pubviaroot)
{
Relation classRel;
ScanKeyData key[1];
@@ -869,12 +874,14 @@ GetAllTablesPublicationRelations(bool pubviaroot)
HeapTuple tuple;
List *result = NIL;
Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
classRel = table_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_class_relkind,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(RELKIND_RELATION));
CharGetDatum(relkind));
scan = table_beginscan_catalog(classRel, 1, key);
@@ -1083,6 +1090,7 @@ GetPublication(Oid pubid)
pub->oid = pubid;
pub->name = pstrdup(NameStr(pubform->pubname));
pub->alltables = pubform->puballtables;
pub->allsequences = pubform->puballsequences;
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
@@ -1160,7 +1168,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* those. Otherwise, get the partitioned table itself.
*/
if (pub_elem->alltables)
pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION,
pub_elem->pubviaroot);
else
{
List *relids,
@@ -1332,3 +1341,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
/*
* Returns Oids of sequences in a publication.
*/
Datum
pg_get_publication_sequences(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
List *sequences = NIL;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
Publication *publication;
MemoryContext oldcontext;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
publication = GetPublicationByName(pubname, false);
if (publication->allsequences)
sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false);
funcctx->user_fctx = (void *) sequences;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
sequences = (List *) funcctx->user_fctx;
if (funcctx->call_cntr < list_length(sequences))
{
Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
}
SRF_RETURN_DONE(funcctx);
}

View File

@@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
CREATE VIEW pg_publication_sequences AS
SELECT
P.pubname AS pubname,
N.nspname AS schemaname,
C.relname AS sequencename
FROM pg_publication P,
LATERAL pg_get_publication_sequences(P.pubname) GPS,
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPS.relid;
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;

View File

@@ -847,11 +847,14 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId));
/* FOR ALL TABLES requires superuser */
if (stmt->for_all_tables && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create FOR ALL TABLES publication")));
/* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
if (!superuser())
{
if (stmt->for_all_tables || stmt->for_all_sequences)
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
}
rel = table_open(PublicationRelationId, RowExclusiveLock);
@@ -880,11 +883,20 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
&publish_generated_columns_given,
&publish_generated_columns);
if (stmt->for_all_sequences &&
(publish_given || publish_via_partition_root_given ||
publish_generated_columns_given))
ereport(NOTICE,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
Anum_pg_publication_oid);
values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
values[Anum_pg_publication_puballsequences - 1] =
BoolGetDatum(stmt->for_all_sequences);
values[Anum_pg_publication_pubinsert - 1] =
BoolGetDatum(pubactions.pubinsert);
values[Anum_pg_publication_pubupdate - 1] =
@@ -914,10 +926,14 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
/* Associate objects with the publication. */
if (stmt->for_all_tables)
{
/* Invalidate relcache so that publication info is rebuilt. */
/*
* Invalidate relcache so that publication info is rebuilt. Sequences
* publication doesn't require invalidation, as replica identity
* checks don't apply to them.
*/
CacheInvalidateRelcacheAll();
}
else
else if (!stmt->for_all_sequences)
{
ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
&schemaidlist);
@@ -989,6 +1005,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
List *root_relids = NIL;
ListCell *lc;
pubform = (Form_pg_publication) GETSTRUCT(tup);
parse_publication_options(pstate,
stmt->options,
&publish_given, &pubactions,
@@ -997,7 +1015,12 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
&publish_generated_columns_given,
&publish_generated_columns);
pubform = (Form_pg_publication) GETSTRUCT(tup);
if (pubform->puballsequences &&
(publish_given || publish_via_partition_root_given ||
publish_generated_columns_given))
ereport(NOTICE,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
/*
* If the publication doesn't publish changes via the root partitioned
@@ -1451,20 +1474,50 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
* Check that user is allowed to manipulate the publication tables in
* schema
*/
if (schemaidlist && pubform->puballtables)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
{
if (pubform->puballtables && pubform->puballsequences)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
NameStr(pubform->pubname)),
errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
else if (pubform->puballtables)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
else
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
NameStr(pubform->pubname)),
errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
}
/* Check that user is allowed to manipulate the publication tables. */
if (tables && pubform->puballtables)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
if (tables && (pubform->puballtables || pubform->puballsequences))
{
if (pubform->puballtables && pubform->puballsequences)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
NameStr(pubform->pubname)),
errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
else if (pubform->puballtables)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
else
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
NameStr(pubform->pubname)),
errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
}
}
/*
@@ -2014,19 +2067,16 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId));
if (form->puballtables && !superuser_arg(newOwnerId))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to change owner of publication \"%s\"",
NameStr(form->pubname)),
errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to change owner of publication \"%s\"",
NameStr(form->pubname)),
errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
if (!superuser_arg(newOwnerId))
{
if (form->puballtables || form->puballsequences ||
is_schema_publication(form->oid))
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to change owner of publication \"%s\"",
NameStr(form->pubname)),
errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
}
}
form->pubowner = newOwnerId;

View File

@@ -202,6 +202,10 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner);
static PartitionStrategy parsePartitionStrategy(char *strategy, int location,
core_yyscan_t yyscanner);
static void preprocess_pub_all_objtype_list(List *all_objects_list,
bool *all_tables,
bool *all_sequences,
core_yyscan_t yyscanner);
static void preprocess_pubobj_list(List *pubobjspec_list,
core_yyscan_t yyscanner);
static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
@@ -260,6 +264,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
PartitionBoundSpec *partboundspec;
RoleSpec *rolespec;
PublicationObjSpec *publicationobjectspec;
PublicationAllObjSpec *publicationallobjectspec;
struct SelectLimit *selectlimit;
SetQuantifier setquantifier;
struct GroupClause *groupclause;
@@ -447,7 +452,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
vacuum_relation_list opt_vacuum_relation_list
drop_option_list pub_obj_list
drop_option_list pub_obj_list pub_obj_type_list
%type <retclause> returning_clause
%type <node> returning_option
@@ -585,6 +590,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> var_value zone_value
%type <rolespec> auth_ident RoleSpec opt_granted_by
%type <publicationobjectspec> PublicationObjSpec
%type <publicationallobjectspec> PublicationAllObjSpec
%type <keyword> unreserved_keyword type_func_name_keyword
%type <keyword> col_name_keyword reserved_keyword
@@ -10704,7 +10710,12 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
*
* CREATE PUBLICATION name [WITH options]
*
* CREATE PUBLICATION FOR ALL TABLES [WITH options]
* CREATE PUBLICATION FOR ALL pub_obj_type [, ...] [WITH options]
*
* pub_obj_type is one of:
*
* TABLES
* SEQUENCES
*
* CREATE PUBLICATION FOR pub_obj [, ...] [WITH options]
*
@@ -10724,13 +10735,16 @@ CreatePublicationStmt:
n->options = $4;
$$ = (Node *) n;
}
| CREATE PUBLICATION name FOR ALL TABLES opt_definition
| CREATE PUBLICATION name FOR pub_obj_type_list opt_definition
{
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
n->pubname = $3;
n->options = $7;
n->for_all_tables = true;
n->pubobjects = (List *) $5;
preprocess_pub_all_objtype_list($5, &n->for_all_tables,
&n->for_all_sequences,
yyscanner);
n->options = $6;
$$ = (Node *) n;
}
| CREATE PUBLICATION name FOR pub_obj_list opt_definition
@@ -10842,6 +10856,28 @@ pub_obj_list: PublicationObjSpec
{ $$ = lappend($1, $3); }
;
PublicationAllObjSpec:
ALL TABLES
{
$$ = makeNode(PublicationAllObjSpec);
$$->pubobjtype = PUBLICATION_ALL_TABLES;
$$->location = @1;
}
| ALL SEQUENCES
{
$$ = makeNode(PublicationAllObjSpec);
$$->pubobjtype = PUBLICATION_ALL_SEQUENCES;
$$->location = @1;
}
;
pub_obj_type_list: PublicationAllObjSpec
{ $$ = list_make1($1); }
| pub_obj_type_list ',' PublicationAllObjSpec
{ $$ = lappend($1, $3); }
;
/*****************************************************************************
*
* ALTER PUBLICATION name SET ( options )
@@ -19639,6 +19675,47 @@ parsePartitionStrategy(char *strategy, int location, core_yyscan_t yyscanner)
}
/*
* Process all_objects_list to set all_tables and/or all_sequences.
* Also, checks if the pub_object_type has been specified more than once.
*/
static void
preprocess_pub_all_objtype_list(List *all_objects_list, bool *all_tables,
bool *all_sequences, core_yyscan_t yyscanner)
{
if (!all_objects_list)
return;
*all_tables = false;
*all_sequences = false;
foreach_ptr(PublicationAllObjSpec, obj, all_objects_list)
{
if (obj->pubobjtype == PUBLICATION_ALL_TABLES)
{
if (*all_tables)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid publication object list"),
errdetail("ALL TABLES can be specified only once."),
parser_errposition(obj->location));
*all_tables = true;
}
else if (obj->pubobjtype == PUBLICATION_ALL_SEQUENCES)
{
if (*all_sequences)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid publication object list"),
errdetail("ALL SEQUENCES can be specified only once."),
parser_errposition(obj->location));
*all_sequences = true;
}
}
}
/*
* Process pubobjspec_list to check for errors in any of the objects and
* convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType.