mirror of
https://github.com/postgres/postgres.git
synced 2025-11-06 07:49:08 +03:00
Rework the options syntax for logical replication commands
For CREATE/ALTER PUBLICATION/SUBSCRIPTION, use similar option style as other statements that use a WITH clause for options. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
This commit is contained in:
@@ -46,6 +46,7 @@
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/varlena.h"
|
||||
|
||||
/* Same as MAXNUMMESSAGES in sinvaladt.c */
|
||||
#define MAX_RELCACHE_INVAL_MSGS 4096
|
||||
@@ -58,18 +59,14 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
|
||||
|
||||
static void
|
||||
parse_publication_options(List *options,
|
||||
bool *publish_insert_given,
|
||||
bool *publish_given,
|
||||
bool *publish_insert,
|
||||
bool *publish_update_given,
|
||||
bool *publish_update,
|
||||
bool *publish_delete_given,
|
||||
bool *publish_delete)
|
||||
{
|
||||
ListCell *lc;
|
||||
|
||||
*publish_insert_given = false;
|
||||
*publish_update_given = false;
|
||||
*publish_delete_given = false;
|
||||
*publish_given = false;
|
||||
|
||||
/* Defaults are true */
|
||||
*publish_insert = true;
|
||||
@@ -81,68 +78,54 @@ parse_publication_options(List *options,
|
||||
{
|
||||
DefElem *defel = (DefElem *) lfirst(lc);
|
||||
|
||||
if (strcmp(defel->defname, "publish insert") == 0)
|
||||
if (strcmp(defel->defname, "publish") == 0)
|
||||
{
|
||||
if (*publish_insert_given)
|
||||
char *publish;
|
||||
List *publish_list;
|
||||
ListCell *lc;
|
||||
|
||||
if (*publish_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
*publish_insert_given = true;
|
||||
*publish_insert = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "nopublish insert") == 0)
|
||||
{
|
||||
if (*publish_insert_given)
|
||||
/*
|
||||
* If publish option was given only the explicitly listed actions
|
||||
* should be published.
|
||||
*/
|
||||
*publish_insert = false;
|
||||
*publish_update = false;
|
||||
*publish_delete = false;
|
||||
|
||||
*publish_given = true;
|
||||
publish = defGetString(defel);
|
||||
|
||||
if (!SplitIdentifierString(publish, ',', &publish_list))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
errmsg("invalid publish list")));
|
||||
|
||||
*publish_insert_given = true;
|
||||
*publish_insert = !defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "publish update") == 0)
|
||||
{
|
||||
if (*publish_update_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
/* Process the option list. */
|
||||
foreach (lc, publish_list)
|
||||
{
|
||||
char *publish_opt = (char *)lfirst(lc);
|
||||
|
||||
*publish_update_given = true;
|
||||
*publish_update = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "nopublish update") == 0)
|
||||
{
|
||||
if (*publish_update_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
*publish_update_given = true;
|
||||
*publish_update = !defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "publish delete") == 0)
|
||||
{
|
||||
if (*publish_delete_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
*publish_delete_given = true;
|
||||
*publish_delete = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "nopublish delete") == 0)
|
||||
{
|
||||
if (*publish_delete_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
*publish_delete_given = true;
|
||||
*publish_delete = !defGetBoolean(defel);
|
||||
if (strcmp(publish_opt, "insert") == 0)
|
||||
*publish_insert = true;
|
||||
else if (strcmp(publish_opt, "update") == 0)
|
||||
*publish_update = true;
|
||||
else if (strcmp(publish_opt, "delete") == 0)
|
||||
*publish_delete = true;
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
|
||||
}
|
||||
}
|
||||
else
|
||||
elog(ERROR, "unrecognized option: %s", defel->defname);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("unrecognized publication parameter: %s", defel->defname)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,9 +141,7 @@ CreatePublication(CreatePublicationStmt *stmt)
|
||||
bool nulls[Natts_pg_publication];
|
||||
Datum values[Natts_pg_publication];
|
||||
HeapTuple tup;
|
||||
bool publish_insert_given;
|
||||
bool publish_update_given;
|
||||
bool publish_delete_given;
|
||||
bool publish_given;
|
||||
bool publish_insert;
|
||||
bool publish_update;
|
||||
bool publish_delete;
|
||||
@@ -199,9 +180,8 @@ CreatePublication(CreatePublicationStmt *stmt)
|
||||
values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
|
||||
|
||||
parse_publication_options(stmt->options,
|
||||
&publish_insert_given, &publish_insert,
|
||||
&publish_update_given, &publish_update,
|
||||
&publish_delete_given, &publish_delete);
|
||||
&publish_given, &publish_insert,
|
||||
&publish_update, &publish_delete);
|
||||
|
||||
values[Anum_pg_publication_puballtables - 1] =
|
||||
BoolGetDatum(stmt->for_all_tables);
|
||||
@@ -253,40 +233,30 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
|
||||
bool nulls[Natts_pg_publication];
|
||||
bool replaces[Natts_pg_publication];
|
||||
Datum values[Natts_pg_publication];
|
||||
bool publish_insert_given;
|
||||
bool publish_update_given;
|
||||
bool publish_delete_given;
|
||||
bool publish_given;
|
||||
bool publish_insert;
|
||||
bool publish_update;
|
||||
bool publish_delete;
|
||||
ObjectAddress obj;
|
||||
|
||||
parse_publication_options(stmt->options,
|
||||
&publish_insert_given, &publish_insert,
|
||||
&publish_update_given, &publish_update,
|
||||
&publish_delete_given, &publish_delete);
|
||||
&publish_given, &publish_insert,
|
||||
&publish_update, &publish_delete);
|
||||
|
||||
/* Everything ok, form a new tuple. */
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, false, sizeof(nulls));
|
||||
memset(replaces, false, sizeof(replaces));
|
||||
|
||||
if (publish_insert_given)
|
||||
if (publish_given)
|
||||
{
|
||||
values[Anum_pg_publication_pubinsert - 1] =
|
||||
BoolGetDatum(publish_insert);
|
||||
values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
|
||||
replaces[Anum_pg_publication_pubinsert - 1] = true;
|
||||
}
|
||||
if (publish_update_given)
|
||||
{
|
||||
values[Anum_pg_publication_pubupdate - 1] =
|
||||
BoolGetDatum(publish_update);
|
||||
|
||||
values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
|
||||
replaces[Anum_pg_publication_pubupdate - 1] = true;
|
||||
}
|
||||
if (publish_delete_given)
|
||||
{
|
||||
values[Anum_pg_publication_pubdelete - 1] =
|
||||
BoolGetDatum(publish_delete);
|
||||
|
||||
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
|
||||
replaces[Anum_pg_publication_pubdelete - 1] = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
{
|
||||
DefElem *defel = (DefElem *) lfirst(lc);
|
||||
|
||||
if (strcmp(defel->defname, "noconnect") == 0 && connect)
|
||||
if (strcmp(defel->defname, "connect") == 0 && connect)
|
||||
{
|
||||
if (connect_given)
|
||||
ereport(ERROR,
|
||||
@@ -101,7 +101,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
connect_given = true;
|
||||
*connect = !defGetBoolean(defel);
|
||||
*connect = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "enabled") == 0 && enabled)
|
||||
{
|
||||
@@ -113,17 +113,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
*enabled_given = true;
|
||||
*enabled = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "disabled") == 0 && enabled)
|
||||
{
|
||||
if (*enabled_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
*enabled_given = true;
|
||||
*enabled = !defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "create slot") == 0 && create_slot)
|
||||
else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
|
||||
{
|
||||
if (create_slot_given)
|
||||
ereport(ERROR,
|
||||
@@ -133,17 +123,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
create_slot_given = true;
|
||||
*create_slot = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "nocreate slot") == 0 && create_slot)
|
||||
{
|
||||
if (create_slot_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
create_slot_given = true;
|
||||
*create_slot = !defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "slot name") == 0 && slot_name)
|
||||
else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
|
||||
{
|
||||
if (*slot_name_given)
|
||||
ereport(ERROR,
|
||||
@@ -157,7 +137,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
if (strcmp(*slot_name, "none") == 0)
|
||||
*slot_name = NULL;
|
||||
}
|
||||
else if (strcmp(defel->defname, "copy data") == 0 && copy_data)
|
||||
else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
|
||||
{
|
||||
if (copy_data_given)
|
||||
ereport(ERROR,
|
||||
@@ -167,16 +147,6 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
||||
copy_data_given = true;
|
||||
*copy_data = defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "nocopy data") == 0 && copy_data)
|
||||
{
|
||||
if (copy_data_given)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
copy_data_given = true;
|
||||
*copy_data = !defGetBoolean(defel);
|
||||
}
|
||||
else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
|
||||
synchronous_commit)
|
||||
{
|
||||
@@ -336,7 +306,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
||||
* replication slot.
|
||||
*/
|
||||
if (create_slot)
|
||||
PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
|
||||
PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
|
||||
Reference in New Issue
Block a user