diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 5aed2694350..367ac814f4b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -23,6 +23,8 @@ PostgreSQL documentation ALTER SUBSCRIPTION name CONNECTION 'conninfo' ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ] +ALTER SUBSCRIPTION name ADD PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ] +ALTER SUBSCRIPTION name DROP PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE @@ -63,7 +65,7 @@ ALTER SUBSCRIPTION name RENAME TO < Commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and - ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh + ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true cannot be executed inside a transaction block. @@ -94,12 +96,19 @@ ALTER SUBSCRIPTION name RENAME TO < SET PUBLICATION publication_name + ADD PUBLICATION publication_name + DROP PUBLICATION publication_name - Changes list of subscribed publications. See - for more information. - By default this command will also act like REFRESH - PUBLICATION. + Changes the list of subscribed publications. SET + replaces the entire list of publications with a new list, + ADD adds additional publications, + DROP removes publications from the list of + publications. See for more + information. By default, this command will also act like + REFRESH PUBLICATION, except that in case of + ADD or DROP, only the added or + dropped publications are refreshed. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5282b797359..517c8edd3b2 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -47,6 +47,8 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_duplicates_in_publist(List *publist, Datum *datums); +static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -293,8 +295,6 @@ publicationListToArray(List *publist) { ArrayType *arr; Datum *datums; - int j = 0; - ListCell *cell; MemoryContext memcxt; MemoryContext oldcxt; @@ -306,28 +306,7 @@ publicationListToArray(List *publist) datums = (Datum *) palloc(sizeof(Datum) * list_length(publist)); - foreach(cell, publist) - { - char *name = strVal(lfirst(cell)); - ListCell *pcell; - - /* Check for duplicates. */ - foreach(pcell, publist) - { - char *pname = strVal(lfirst(pcell)); - - if (pcell == cell) - break; - - if (strcmp(name, pname) == 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("publication name \"%s\" used more than once", - pname))); - } - - datums[j++] = CStringGetTextDatum(name); - } + check_duplicates_in_publist(publist, datums); MemoryContextSwitchTo(oldcxt); @@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; break; - case ALTER_SUBSCRIPTION_PUBLICATION: + case ALTER_SUBSCRIPTION_SET_PUBLICATION: { bool copy_data; bool refresh; @@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) break; } + case ALTER_SUBSCRIPTION_ADD_PUBLICATION: + case ALTER_SUBSCRIPTION_DROP_PUBLICATION: + { + bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; + bool copy_data; + bool refresh; + List *publist; + + publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); + + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + isadd ? ©_data : NULL, /* for drop, no + * "copy_data" */ + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ + + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publist); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Refresh if user asked us to. */ + if (refresh) + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); + + /* Only refresh the added/dropped list of publications. */ + sub->publications = stmt->publication; + + AlterSubscription_refresh(sub, copy_data); + } + + break; + } + case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; @@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) errhint("Use %s to disassociate the subscription from the slot.", "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); } + +/* + * Check for duplicates in the given list of publications and error out if + * found one. Add publications to datums as text datums, if datums is not + * NULL. + */ +static void +check_duplicates_in_publist(List *publist, Datum *datums) +{ + ListCell *cell; + int j = 0; + + foreach(cell, publist) + { + char *name = strVal(lfirst(cell)); + ListCell *pcell; + + foreach(pcell, publist) + { + char *pname = strVal(lfirst(pcell)); + + if (pcell == cell) + break; + + if (strcmp(name, pname) == 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" used more than once", + pname))); + } + + if (datums) + datums[j++] = CStringGetTextDatum(name); + } +} + +/* + * Merge current subscription's publications and user-specified publications + * from ADD/DROP PUBLICATIONS. + * + * If addpub is true, we will add the list of publications into oldpublist. + * Otherwise, we will delete the list of publications from oldpublist. The + * returned list is a copy, oldpublist itself is not changed. + * + * subname is the subscription name, for error messages. + */ +static List * +merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname) +{ + ListCell *lc; + + oldpublist = list_copy(oldpublist); + + check_duplicates_in_publist(newpublist, NULL); + + foreach(lc, newpublist) + { + char *name = strVal(lfirst(lc)); + ListCell *lc2; + bool found = false; + + foreach(lc2, oldpublist) + { + char *pubname = strVal(lfirst(lc2)); + + if (strcmp(name, pubname) == 0) + { + found = true; + if (addpub) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("publication \"%s\" is already in subscription \"%s\"", + name, subname))); + else + oldpublist = foreach_delete_current(oldpublist, lc2); + + break; + } + } + + if (addpub && !found) + oldpublist = lappend(oldpublist, makeString(name)); + else if (!addpub && !found) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication \"%s\" is not in subscription \"%s\"", + name, subname))); + } + + /* + * XXX Probably no strong reason for this, but for now it's to make ALTER + * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION. + */ + if (!oldpublist) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("subscription must contain at least one publication"))); + + return oldpublist; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 05cc2c9ae0d..38c36a49360 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9687,11 +9687,31 @@ AlterSubscriptionStmt: n->options = $6; $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } | ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_PUBLICATION; + n->kind = ALTER_SUBSCRIPTION_SET_PUBLICATION; n->subname = $3; n->publication = $6; n->options = $7; diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index a053bc1e45d..832bcdfc3bf 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1652,7 +1652,8 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", - "RENAME TO", "REFRESH PUBLICATION", "SET"); + "RENAME TO", "REFRESH PUBLICATION", "SET", + "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION")) @@ -1672,14 +1673,15 @@ psql_completion(const char *text, int start, int end) { /* complete with nothing here as this refers to remote publications */ } - /* ALTER SUBSCRIPTION SET PUBLICATION */ + /* ALTER SUBSCRIPTION ADD|DROP|SET PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && - TailMatches("SET", "PUBLICATION", MatchAny)) + TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny)) COMPLETE_WITH("WITH ("); - /* ALTER SUBSCRIPTION SET PUBLICATION WITH ( */ + /* ALTER SUBSCRIPTION ADD|DROP|SET PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && - TailMatches("SET", "PUBLICATION", MatchAny, "WITH", "(")) + TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "(")) COMPLETE_WITH("copy_data", "refresh"); + /* ALTER SCHEMA */ else if (Matches("ALTER", "SCHEMA", MatchAny)) COMPLETE_WITH("OWNER TO", "RENAME TO"); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 97b80dfd210..807fbaceaac 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3618,7 +3618,9 @@ typedef enum AlterSubscriptionType { ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, - ALTER_SUBSCRIPTION_PUBLICATION, + ALTER_SUBSCRIPTION_SET_PUBLICATION, + ALTER_SUBSCRIPTION_ADD_PUBLICATION, + ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED } AlterSubscriptionType; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 14a430221d6..09576c176b6 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -200,6 +200,45 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist (1 row) +-- fail - publication already exists +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false); +ERROR: publication "testpub" is already in subscription "regress_testsub" +-- fail - publication used more than once +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false); +ERROR: publication name "testpub1" used more than once +-- ok - add two publications into subscription +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); +-- fail - publications already exist +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); +ERROR: publication "testpub1" is already in subscription "regress_testsub" +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist +(1 row) + +-- fail - publication used more then once +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false); +ERROR: publication name "testpub1" used more than once +-- fail - all publications are deleted +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false); +ERROR: subscription must contain at least one publication +-- fail - publication does not exist in subscription +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false); +ERROR: publication "testpub3" is not in subscription "regress_testsub" +-- fail - do not support copy_data option +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true); +ERROR: unrecognized subscription parameter: "copy_data" +-- ok - delete publications +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist +(1 row) + DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub WITH (connect = false, create_slot = false, copy_data = false); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 81e65e5e642..308c098c144 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -145,6 +145,37 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ +-- fail - publication already exists +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false); + +-- fail - publication used more than once +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false); + +-- ok - add two publications into subscription +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); + +-- fail - publications already exist +ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); + +\dRs+ + +-- fail - publication used more then once +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false); + +-- fail - all publications are deleted +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false); + +-- fail - publication does not exist in subscription +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false); + +-- fail - do not support copy_data option +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true); + +-- ok - delete publications +ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); + +\dRs+ + DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub