mirror of
https://github.com/postgres/postgres.git
synced 2025-06-13 07:41:39 +03:00
ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
At present, if we want to update publications in a subscription, we can use SET PUBLICATION. However, it requires supplying all publications that exists and the new publications. If we want to add new publications, it's inconvenient. The new syntax only supplies the new publications. When the refresh is true, it only refreshes the new publications. Author: Japin Li <japinli@hotmail.com> Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Discussion: https://www.postgresql.org/message-id/flat/MEYP282MB166939D0D6C480B7FBE7EFFBB6BC0@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM
This commit is contained in:
@ -47,6 +47,8 @@
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
|
||||
static void check_duplicates_in_publist(List *publist, Datum *datums);
|
||||
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
|
||||
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
|
||||
|
||||
|
||||
@ -293,8 +295,6 @@ publicationListToArray(List *publist)
|
||||
{
|
||||
ArrayType *arr;
|
||||
Datum *datums;
|
||||
int j = 0;
|
||||
ListCell *cell;
|
||||
MemoryContext memcxt;
|
||||
MemoryContext oldcxt;
|
||||
|
||||
@ -306,28 +306,7 @@ publicationListToArray(List *publist)
|
||||
|
||||
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
|
||||
|
||||
foreach(cell, publist)
|
||||
{
|
||||
char *name = strVal(lfirst(cell));
|
||||
ListCell *pcell;
|
||||
|
||||
/* Check for duplicates. */
|
||||
foreach(pcell, publist)
|
||||
{
|
||||
char *pname = strVal(lfirst(pcell));
|
||||
|
||||
if (pcell == cell)
|
||||
break;
|
||||
|
||||
if (strcmp(name, pname) == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("publication name \"%s\" used more than once",
|
||||
pname)));
|
||||
}
|
||||
|
||||
datums[j++] = CStringGetTextDatum(name);
|
||||
}
|
||||
check_duplicates_in_publist(publist, datums);
|
||||
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
||||
@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
|
||||
update_tuple = true;
|
||||
break;
|
||||
|
||||
case ALTER_SUBSCRIPTION_PUBLICATION:
|
||||
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
|
||||
{
|
||||
bool copy_data;
|
||||
bool refresh;
|
||||
@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
|
||||
break;
|
||||
}
|
||||
|
||||
case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
|
||||
case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
|
||||
{
|
||||
bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
|
||||
bool copy_data;
|
||||
bool refresh;
|
||||
List *publist;
|
||||
|
||||
publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
|
||||
|
||||
parse_subscription_options(stmt->options,
|
||||
NULL, /* no "connect" */
|
||||
NULL, NULL, /* no "enabled" */
|
||||
NULL, /* no "create_slot" */
|
||||
NULL, NULL, /* no "slot_name" */
|
||||
isadd ? ©_data : NULL, /* for drop, no
|
||||
* "copy_data" */
|
||||
NULL, /* no "synchronous_commit" */
|
||||
&refresh,
|
||||
NULL, NULL, /* no "binary" */
|
||||
NULL, NULL); /* no "streaming" */
|
||||
|
||||
values[Anum_pg_subscription_subpublications - 1] =
|
||||
publicationListToArray(publist);
|
||||
replaces[Anum_pg_subscription_subpublications - 1] = true;
|
||||
|
||||
update_tuple = true;
|
||||
|
||||
/* Refresh if user asked us to. */
|
||||
if (refresh)
|
||||
{
|
||||
if (!sub->enabled)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
|
||||
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
|
||||
|
||||
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
|
||||
|
||||
/* Only refresh the added/dropped list of publications. */
|
||||
sub->publications = stmt->publication;
|
||||
|
||||
AlterSubscription_refresh(sub, copy_data);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case ALTER_SUBSCRIPTION_REFRESH:
|
||||
{
|
||||
bool copy_data;
|
||||
@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
|
||||
errhint("Use %s to disassociate the subscription from the slot.",
|
||||
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Check for duplicates in the given list of publications and error out if
|
||||
* found one. Add publications to datums as text datums, if datums is not
|
||||
* NULL.
|
||||
*/
|
||||
static void
|
||||
check_duplicates_in_publist(List *publist, Datum *datums)
|
||||
{
|
||||
ListCell *cell;
|
||||
int j = 0;
|
||||
|
||||
foreach(cell, publist)
|
||||
{
|
||||
char *name = strVal(lfirst(cell));
|
||||
ListCell *pcell;
|
||||
|
||||
foreach(pcell, publist)
|
||||
{
|
||||
char *pname = strVal(lfirst(pcell));
|
||||
|
||||
if (pcell == cell)
|
||||
break;
|
||||
|
||||
if (strcmp(name, pname) == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("publication name \"%s\" used more than once",
|
||||
pname)));
|
||||
}
|
||||
|
||||
if (datums)
|
||||
datums[j++] = CStringGetTextDatum(name);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Merge current subscription's publications and user-specified publications
|
||||
* from ADD/DROP PUBLICATIONS.
|
||||
*
|
||||
* If addpub is true, we will add the list of publications into oldpublist.
|
||||
* Otherwise, we will delete the list of publications from oldpublist. The
|
||||
* returned list is a copy, oldpublist itself is not changed.
|
||||
*
|
||||
* subname is the subscription name, for error messages.
|
||||
*/
|
||||
static List *
|
||||
merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
|
||||
{
|
||||
ListCell *lc;
|
||||
|
||||
oldpublist = list_copy(oldpublist);
|
||||
|
||||
check_duplicates_in_publist(newpublist, NULL);
|
||||
|
||||
foreach(lc, newpublist)
|
||||
{
|
||||
char *name = strVal(lfirst(lc));
|
||||
ListCell *lc2;
|
||||
bool found = false;
|
||||
|
||||
foreach(lc2, oldpublist)
|
||||
{
|
||||
char *pubname = strVal(lfirst(lc2));
|
||||
|
||||
if (strcmp(name, pubname) == 0)
|
||||
{
|
||||
found = true;
|
||||
if (addpub)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
||||
errmsg("publication \"%s\" is already in subscription \"%s\"",
|
||||
name, subname)));
|
||||
else
|
||||
oldpublist = foreach_delete_current(oldpublist, lc2);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (addpub && !found)
|
||||
oldpublist = lappend(oldpublist, makeString(name));
|
||||
else if (!addpub && !found)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("publication \"%s\" is not in subscription \"%s\"",
|
||||
name, subname)));
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX Probably no strong reason for this, but for now it's to make ALTER
|
||||
* SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
|
||||
*/
|
||||
if (!oldpublist)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
|
||||
errmsg("subscription must contain at least one publication")));
|
||||
|
||||
return oldpublist;
|
||||
}
|
||||
|
Reference in New Issue
Block a user