diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b862e59f1da..eb88d877a50 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -46,6 +46,41 @@ #include "utils/memutils.h" #include "utils/syscache.h" +/* + * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION + * command. + */ +#define SUBOPT_CONNECT 0x00000001 +#define SUBOPT_ENABLED 0x00000002 +#define SUBOPT_CREATE_SLOT 0x00000004 +#define SUBOPT_SLOT_NAME 0x00000008 +#define SUBOPT_COPY_DATA 0x00000010 +#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020 +#define SUBOPT_REFRESH 0x00000040 +#define SUBOPT_BINARY 0x00000080 +#define SUBOPT_STREAMING 0x00000100 + +/* check if the 'val' has 'bits' set */ +#define IsSet(val, bits) (((val) & (bits)) == (bits)) + +/* + * Structure to hold a bitmap representing the user-provided CREATE/ALTER + * SUBSCRIPTION command options and the parsed/default values of each of them. + */ +typedef struct SubOpts +{ + bits32 specified_opts; + char *slot_name; + char *synchronous_commit; + bool connect; + bool enabled; + bool create_slot; + bool copy_data; + bool refresh; + bool binary; + bool streaming; +} SubOpts; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -56,164 +91,151 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. * * Since not all options can be specified in both commands, this function - * will report an error on options if the target output pointer is NULL to - * accommodate that. + * will report an error if mutually exclusive options are specified. + * + * Caller is expected to have cleared 'opts'. */ static void -parse_subscription_options(List *options, - bool *connect, - bool *enabled_given, bool *enabled, - bool *create_slot, - bool *slot_name_given, char **slot_name, - bool *copy_data, - char **synchronous_commit, - bool *refresh, - bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) +parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts) { ListCell *lc; - bool connect_given = false; - bool create_slot_given = false; - bool copy_data_given = false; - bool refresh_given = false; - /* If connect is specified, the others also need to be. */ - Assert(!connect || (enabled && create_slot && copy_data)); + /* caller must expect some option */ + Assert(supported_opts != 0); - if (connect) - *connect = true; - if (enabled) - { - *enabled_given = false; - *enabled = true; - } - if (create_slot) - *create_slot = true; - if (slot_name) - { - *slot_name_given = false; - *slot_name = NULL; - } - if (copy_data) - *copy_data = true; - if (synchronous_commit) - *synchronous_commit = NULL; - if (refresh) - *refresh = true; - if (binary) - { - *binary_given = false; - *binary = false; - } - if (streaming) - { - *streaming_given = false; - *streaming = false; - } + /* If connect option is supported, these others also need to be. */ + Assert(!IsSet(supported_opts, SUBOPT_CONNECT) || + IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | + SUBOPT_COPY_DATA)); + + /* Set default values for the boolean supported options. */ + if (IsSet(supported_opts, SUBOPT_CONNECT)) + opts->connect = true; + if (IsSet(supported_opts, SUBOPT_ENABLED)) + opts->enabled = true; + if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) + opts->create_slot = true; + if (IsSet(supported_opts, SUBOPT_COPY_DATA)) + opts->copy_data = true; + if (IsSet(supported_opts, SUBOPT_REFRESH)) + opts->refresh = true; + if (IsSet(supported_opts, SUBOPT_BINARY)) + opts->binary = false; + if (IsSet(supported_opts, SUBOPT_STREAMING)) + opts->streaming = false; /* Parse options */ - foreach(lc, options) + foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "connect") == 0 && connect) + if (IsSet(supported_opts, SUBOPT_CONNECT) && + strcmp(defel->defname, "connect") == 0) { - if (connect_given) + if (IsSet(opts->specified_opts, SUBOPT_CONNECT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - connect_given = true; - *connect = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_CONNECT; + opts->connect = defGetBoolean(defel); } - else if (strcmp(defel->defname, "enabled") == 0 && enabled) + else if (IsSet(supported_opts, SUBOPT_ENABLED) && + strcmp(defel->defname, "enabled") == 0) { - if (*enabled_given) + if (IsSet(opts->specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *enabled_given = true; - *enabled = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_ENABLED; + opts->enabled = defGetBoolean(defel); } - else if (strcmp(defel->defname, "create_slot") == 0 && create_slot) + else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + strcmp(defel->defname, "create_slot") == 0) { - if (create_slot_given) + if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - create_slot_given = true; - *create_slot = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_CREATE_SLOT; + opts->create_slot = defGetBoolean(defel); } - else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) + else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && + strcmp(defel->defname, "slot_name") == 0) { - if (*slot_name_given) + if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *slot_name_given = true; - *slot_name = defGetString(defel); + opts->specified_opts |= SUBOPT_SLOT_NAME; + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ - if (strcmp(*slot_name, "none") == 0) - *slot_name = NULL; + if (strcmp(opts->slot_name, "none") == 0) + opts->slot_name = NULL; } - else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) + else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && + strcmp(defel->defname, "copy_data") == 0) { - if (copy_data_given) + if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - copy_data_given = true; - *copy_data = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_COPY_DATA; + opts->copy_data = defGetBoolean(defel); } - else if (strcmp(defel->defname, "synchronous_commit") == 0 && - synchronous_commit) + else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && + strcmp(defel->defname, "synchronous_commit") == 0) { - if (*synchronous_commit) + if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *synchronous_commit = defGetString(defel); + opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts->synchronous_commit = defGetString(defel); /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", *synchronous_commit, + (void) set_config_option("synchronous_commit", opts->synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } - else if (strcmp(defel->defname, "refresh") == 0 && refresh) + else if (IsSet(supported_opts, SUBOPT_REFRESH) && + strcmp(defel->defname, "refresh") == 0) { - if (refresh_given) + if (IsSet(opts->specified_opts, SUBOPT_REFRESH)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - refresh_given = true; - *refresh = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_REFRESH; + opts->refresh = defGetBoolean(defel); } - else if (strcmp(defel->defname, "binary") == 0 && binary) + else if (IsSet(supported_opts, SUBOPT_BINARY) && + strcmp(defel->defname, "binary") == 0) { - if (*binary_given) + if (IsSet(opts->specified_opts, SUBOPT_BINARY)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *binary_given = true; - *binary = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_BINARY; + opts->binary = defGetBoolean(defel); } - else if (strcmp(defel->defname, "streaming") == 0 && streaming) + else if (IsSet(supported_opts, SUBOPT_STREAMING) && + strcmp(defel->defname, "streaming") == 0) { - if (*streaming_given) + if (IsSet(opts->specified_opts, SUBOPT_STREAMING)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *streaming_given = true; - *streaming = defGetBoolean(defel); + opts->specified_opts |= SUBOPT_STREAMING; + opts->streaming = defGetBoolean(defel); } else ereport(ERROR, @@ -225,63 +247,81 @@ parse_subscription_options(List *options, * We've been explicitly asked to not connect, that requires some * additional processing. */ - if (connect && !*connect) + if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT)) { /* Check for incompatible options from the user. */ - if (enabled && *enabled_given && *enabled) + if (opts->enabled && + IsSet(supported_opts, SUBOPT_ENABLED) && + IsSet(opts->specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", "connect = false", "enabled = true"))); - if (create_slot && create_slot_given && *create_slot) + if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "create_slot = true"))); - if (copy_data && copy_data_given && *copy_data) + if (opts->copy_data && + IsSet(supported_opts, SUBOPT_COPY_DATA) && + IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); /* Change the defaults of other options. */ - *enabled = false; - *create_slot = false; - *copy_data = false; + opts->enabled = false; + opts->create_slot = false; + opts->copy_data = false; } /* * Do additional checking for disallowed combination when slot_name = NONE * was used. */ - if (slot_name && *slot_name_given && !*slot_name) + if (!opts->slot_name && + IsSet(supported_opts, SUBOPT_SLOT_NAME) && + IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) { - if (enabled && *enabled_given && *enabled) + if (opts->enabled && + IsSet(supported_opts, SUBOPT_ENABLED) && + IsSet(opts->specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", "slot_name = NONE", "enabled = true"))); - if (create_slot && create_slot_given && *create_slot) + if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", "slot_name = NONE", "create_slot = true"))); - if (enabled && !*enabled_given && *enabled) + if (opts->enabled && + IsSet(supported_opts, SUBOPT_ENABLED) && + !IsSet(opts->specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("subscription with %s must also set %s", "slot_name = NONE", "enabled = false"))); - if (create_slot && !create_slot_given && *create_slot) + if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + !IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } @@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; - bool connect; - bool enabled_given; - bool enabled; - bool copy_data; - bool streaming; - bool streaming_given; - char *synchronous_commit; char *conninfo; - char *slotname; - bool slotname_given; - bool binary; - bool binary_given; char originname[NAMEDATALEN]; - bool create_slot; List *publications; + bits32 supported_opts; + SubOpts opts = {0}; /* * Parse and check options. * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, - &connect, - &enabled_given, &enabled, - &create_slot, - &slotname_given, &slotname, - ©_data, - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | + SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | + SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | + SUBOPT_STREAMING); + parse_subscription_options(stmt->options, supported_opts, &opts); /* * Since creating a replication slot is not transactional, rolling back @@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * CREATE SUBSCRIPTION inside a transaction block if creating a * replication slot. */ - if (create_slot) + if (opts.create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); if (!superuser()) @@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (!slotname_given && slotname == NULL) - slotname = stmt->subname; + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && + opts.slot_name == NULL) + opts.slot_name = stmt->subname; /* The default for synchronous_commit of subscriptions is off. */ - if (synchronous_commit == NULL) - synchronous_commit = "off"; + if (opts.synchronous_commit == NULL) + opts.synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - if (slotname) + if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connect to remote side to execute requested commands and fetch table * info. */ - if (connect) + if (opts.connect) { char *err; WalReceiverConn *wrconn; @@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Set sync state based on if we were asked to do data copy or * not. */ - table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status @@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * won't use the initial snapshot for anything, so no need to * export it. */ - if (create_slot) + if (opts.create_slot) { - Assert(slotname); + Assert(opts.slot_name); - walrcv_create_slot(wrconn, slotname, false, + walrcv_create_slot(wrconn, opts.slot_name, false, CRS_NOEXPORT_SNAPSHOT, NULL); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", - slotname))); + opts.slot_name))); } } PG_FINALLY(); @@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, RowExclusiveLock); - if (enabled) + if (opts.enabled) ApplyLauncherWakeupAtCommit(); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool update_tuple = false; Subscription *sub; Form_pg_subscription form; + bits32 supported_opts; + SubOpts opts = {0}; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slotname; - bool slotname_given; - char *synchronous_commit; - bool binary_given; - bool binary; - bool streaming_given; - bool streaming; + supported_opts = (SUBOPT_SLOT_NAME | + SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | + SUBOPT_STREAMING); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - &slotname_given, &slotname, - NULL, /* no "copy_data" */ - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + parse_subscription_options(stmt->options, supported_opts, &opts); - if (slotname_given) + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { - if (sub->enabled && !slotname) + if (sub->enabled && !opts.slot_name) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot set %s for enabled subscription", "slot_name = NONE"))); - if (slotname) + if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } - if (synchronous_commit) + if (opts.synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.synchronous_commit); replaces[Anum_pg_subscription_subsynccommit - 1] = true; } - if (binary_given) + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) { values[Anum_pg_subscription_subbinary - 1] = - BoolGetDatum(binary); + BoolGetDatum(opts.binary); replaces[Anum_pg_subscription_subbinary - 1] = true; } - if (streaming_given) + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { values[Anum_pg_subscription_substream - 1] = - BoolGetDatum(streaming); + BoolGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; } @@ -861,31 +876,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ENABLED: { - bool enabled, - enabled_given; + parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - &enabled_given, &enabled, - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - NULL, /* no "copy_data" */ - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ - Assert(enabled_given); - - if (!sub->slotname && enabled) + if (!sub->slotname && opts.enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); values[Anum_pg_subscription_subenabled - 1] = - BoolGetDatum(enabled); + BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; - if (enabled) + if (opts.enabled) ApplyLauncherWakeupAtCommit(); update_tuple = true; @@ -906,19 +909,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - bool copy_data; - bool refresh; + supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; + parse_subscription_options(stmt->options, supported_opts, &opts); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -926,7 +919,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.refresh) { if (!sub->enabled) ereport(ERROR, @@ -939,7 +932,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); } break; @@ -948,25 +941,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ADD_PUBLICATION: case ALTER_SUBSCRIPTION_DROP_PUBLICATION: { - bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - bool copy_data = false; - bool refresh; List *publist; + bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - isadd ? ©_data : NULL, /* for drop, no - * "copy_data" */ - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + supported_opts = SUBOPT_REFRESH; + if (isadd) + supported_opts |= SUBOPT_COPY_DATA; + + parse_subscription_options(stmt->options, supported_opts, &opts); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); - values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -974,7 +958,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.refresh) { if (!sub->enabled) ereport(ERROR, @@ -987,7 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); } break; @@ -995,27 +979,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { - bool copy_data; - if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); break; } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 64c06cf9523..a72d53a272f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2511,6 +2511,7 @@ StringInfoData StripnullState SubLink SubLinkType +SubOpts SubPlan SubPlanState SubRemoveRels