mirror of
https://github.com/postgres/postgres.git
synced 2025-05-01 01:04:50 +03:00
Rework some code handling pg_subscription data in psql and pg_dump
This commit fixes some inconsistencies found in the frontend code when dealing with subscription catalog data. The following changes are done: - pg_subscription.h gains a EXPOSE_TO_CLIENT_CODE, so as more content defined in pg_subscription.h becomes available in pg_subscription_d.h for the frontend. - In psql's describe.c, substream can be switched to use CppAsString2() with its three LOGICALREP_STREAM_* values, with pg_subscription_d.h included. - pg_dump.c included pg_subscription.h, which is a header that should only be used in the backend code. The code is updated to use pg_subscription_d.h instead. - pg_dump stored all the data from pg_subscription in SubscriptionInfo with only strings, and a good chunk of them are boolean and char values. Using strings is not necessary, complicates the code (see for example two_phase_disabled[] removed here), and is inconsistent with the way other catalogs' data is handled. The fields of SubscriptionInfo are reordered to match with the order in its catalog, while on it. Reviewed-by: Hayato Kuroda Discussion: https://postgr.es/m/Z0lB2kp0ksHgmVuk@paquier.xyz
This commit is contained in:
parent
75818b3afb
commit
08691ea958
@ -50,7 +50,7 @@
|
|||||||
#include "catalog/pg_default_acl_d.h"
|
#include "catalog/pg_default_acl_d.h"
|
||||||
#include "catalog/pg_largeobject_d.h"
|
#include "catalog/pg_largeobject_d.h"
|
||||||
#include "catalog/pg_proc_d.h"
|
#include "catalog/pg_proc_d.h"
|
||||||
#include "catalog/pg_subscription.h"
|
#include "catalog/pg_subscription_d.h"
|
||||||
#include "catalog/pg_type_d.h"
|
#include "catalog/pg_type_d.h"
|
||||||
#include "common/connect.h"
|
#include "common/connect.h"
|
||||||
#include "common/int.h"
|
#include "common/int.h"
|
||||||
@ -4968,20 +4968,20 @@ getSubscriptions(Archive *fout)
|
|||||||
i_oid = PQfnumber(res, "oid");
|
i_oid = PQfnumber(res, "oid");
|
||||||
i_subname = PQfnumber(res, "subname");
|
i_subname = PQfnumber(res, "subname");
|
||||||
i_subowner = PQfnumber(res, "subowner");
|
i_subowner = PQfnumber(res, "subowner");
|
||||||
|
i_subenabled = PQfnumber(res, "subenabled");
|
||||||
i_subbinary = PQfnumber(res, "subbinary");
|
i_subbinary = PQfnumber(res, "subbinary");
|
||||||
i_substream = PQfnumber(res, "substream");
|
i_substream = PQfnumber(res, "substream");
|
||||||
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
|
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
|
||||||
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
|
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
|
||||||
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
|
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
|
||||||
i_subrunasowner = PQfnumber(res, "subrunasowner");
|
i_subrunasowner = PQfnumber(res, "subrunasowner");
|
||||||
|
i_subfailover = PQfnumber(res, "subfailover");
|
||||||
i_subconninfo = PQfnumber(res, "subconninfo");
|
i_subconninfo = PQfnumber(res, "subconninfo");
|
||||||
i_subslotname = PQfnumber(res, "subslotname");
|
i_subslotname = PQfnumber(res, "subslotname");
|
||||||
i_subsynccommit = PQfnumber(res, "subsynccommit");
|
i_subsynccommit = PQfnumber(res, "subsynccommit");
|
||||||
i_subpublications = PQfnumber(res, "subpublications");
|
i_subpublications = PQfnumber(res, "subpublications");
|
||||||
i_suborigin = PQfnumber(res, "suborigin");
|
i_suborigin = PQfnumber(res, "suborigin");
|
||||||
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
|
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
|
||||||
i_subenabled = PQfnumber(res, "subenabled");
|
|
||||||
i_subfailover = PQfnumber(res, "subfailover");
|
|
||||||
|
|
||||||
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
|
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
|
||||||
|
|
||||||
@ -4995,18 +4995,20 @@ getSubscriptions(Archive *fout)
|
|||||||
subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
|
subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
|
||||||
subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
|
subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
|
||||||
|
|
||||||
|
subinfo[i].subenabled =
|
||||||
|
(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
|
||||||
subinfo[i].subbinary =
|
subinfo[i].subbinary =
|
||||||
pg_strdup(PQgetvalue(res, i, i_subbinary));
|
(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
|
||||||
subinfo[i].substream =
|
subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
|
||||||
pg_strdup(PQgetvalue(res, i, i_substream));
|
subinfo[i].subtwophasestate = *(PQgetvalue(res, i, i_subtwophasestate));
|
||||||
subinfo[i].subtwophasestate =
|
|
||||||
pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
|
|
||||||
subinfo[i].subdisableonerr =
|
subinfo[i].subdisableonerr =
|
||||||
pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
|
(strcmp(PQgetvalue(res, i, i_subdisableonerr), "t") == 0);
|
||||||
subinfo[i].subpasswordrequired =
|
subinfo[i].subpasswordrequired =
|
||||||
pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
|
(strcmp(PQgetvalue(res, i, i_subpasswordrequired), "t") == 0);
|
||||||
subinfo[i].subrunasowner =
|
subinfo[i].subrunasowner =
|
||||||
pg_strdup(PQgetvalue(res, i, i_subrunasowner));
|
(strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
|
||||||
|
subinfo[i].subfailover =
|
||||||
|
(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
|
||||||
subinfo[i].subconninfo =
|
subinfo[i].subconninfo =
|
||||||
pg_strdup(PQgetvalue(res, i, i_subconninfo));
|
pg_strdup(PQgetvalue(res, i, i_subconninfo));
|
||||||
if (PQgetisnull(res, i, i_subslotname))
|
if (PQgetisnull(res, i, i_subslotname))
|
||||||
@ -5024,10 +5026,6 @@ getSubscriptions(Archive *fout)
|
|||||||
else
|
else
|
||||||
subinfo[i].suboriginremotelsn =
|
subinfo[i].suboriginremotelsn =
|
||||||
pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
|
pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
|
||||||
subinfo[i].subenabled =
|
|
||||||
pg_strdup(PQgetvalue(res, i, i_subenabled));
|
|
||||||
subinfo[i].subfailover =
|
|
||||||
pg_strdup(PQgetvalue(res, i, i_subfailover));
|
|
||||||
|
|
||||||
/* Decide whether we want to dump it */
|
/* Decide whether we want to dump it */
|
||||||
selectDumpableObject(&(subinfo[i].dobj), fout);
|
selectDumpableObject(&(subinfo[i].dobj), fout);
|
||||||
@ -5208,7 +5206,6 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
|
|||||||
char **pubnames = NULL;
|
char **pubnames = NULL;
|
||||||
int npubnames = 0;
|
int npubnames = 0;
|
||||||
int i;
|
int i;
|
||||||
char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
|
|
||||||
|
|
||||||
/* Do nothing if not dumping schema */
|
/* Do nothing if not dumping schema */
|
||||||
if (!dopt->dumpSchema)
|
if (!dopt->dumpSchema)
|
||||||
@ -5245,29 +5242,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
|
|||||||
else
|
else
|
||||||
appendPQExpBufferStr(query, "NONE");
|
appendPQExpBufferStr(query, "NONE");
|
||||||
|
|
||||||
if (strcmp(subinfo->subbinary, "t") == 0)
|
if (subinfo->subbinary)
|
||||||
appendPQExpBufferStr(query, ", binary = true");
|
appendPQExpBufferStr(query, ", binary = true");
|
||||||
|
|
||||||
if (strcmp(subinfo->substream, "t") == 0)
|
if (subinfo->substream == LOGICALREP_STREAM_ON)
|
||||||
appendPQExpBufferStr(query, ", streaming = on");
|
appendPQExpBufferStr(query, ", streaming = on");
|
||||||
else if (strcmp(subinfo->substream, "p") == 0)
|
else if (subinfo->substream == LOGICALREP_STREAM_PARALLEL)
|
||||||
appendPQExpBufferStr(query, ", streaming = parallel");
|
appendPQExpBufferStr(query, ", streaming = parallel");
|
||||||
else
|
else
|
||||||
appendPQExpBufferStr(query, ", streaming = off");
|
appendPQExpBufferStr(query, ", streaming = off");
|
||||||
|
|
||||||
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
|
if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
|
||||||
appendPQExpBufferStr(query, ", two_phase = on");
|
appendPQExpBufferStr(query, ", two_phase = on");
|
||||||
|
|
||||||
if (strcmp(subinfo->subdisableonerr, "t") == 0)
|
if (subinfo->subdisableonerr)
|
||||||
appendPQExpBufferStr(query, ", disable_on_error = true");
|
appendPQExpBufferStr(query, ", disable_on_error = true");
|
||||||
|
|
||||||
if (strcmp(subinfo->subpasswordrequired, "t") != 0)
|
if (!subinfo->subpasswordrequired)
|
||||||
appendPQExpBuffer(query, ", password_required = false");
|
appendPQExpBuffer(query, ", password_required = false");
|
||||||
|
|
||||||
if (strcmp(subinfo->subrunasowner, "t") == 0)
|
if (subinfo->subrunasowner)
|
||||||
appendPQExpBufferStr(query, ", run_as_owner = true");
|
appendPQExpBufferStr(query, ", run_as_owner = true");
|
||||||
|
|
||||||
if (strcmp(subinfo->subfailover, "t") == 0)
|
if (subinfo->subfailover)
|
||||||
appendPQExpBufferStr(query, ", failover = true");
|
appendPQExpBufferStr(query, ", failover = true");
|
||||||
|
|
||||||
if (strcmp(subinfo->subsynccommit, "off") != 0)
|
if (strcmp(subinfo->subsynccommit, "off") != 0)
|
||||||
@ -5303,7 +5300,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
|
|||||||
appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
|
appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(subinfo->subenabled, "t") == 0)
|
if (subinfo->subenabled)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Enable the subscription to allow the replication to continue
|
* Enable the subscription to allow the replication to continue
|
||||||
|
@ -664,20 +664,20 @@ typedef struct _SubscriptionInfo
|
|||||||
{
|
{
|
||||||
DumpableObject dobj;
|
DumpableObject dobj;
|
||||||
const char *rolname;
|
const char *rolname;
|
||||||
char *subenabled;
|
bool subenabled;
|
||||||
char *subbinary;
|
bool subbinary;
|
||||||
char *substream;
|
char substream;
|
||||||
char *subtwophasestate;
|
char subtwophasestate;
|
||||||
char *subdisableonerr;
|
bool subdisableonerr;
|
||||||
char *subpasswordrequired;
|
bool subpasswordrequired;
|
||||||
char *subrunasowner;
|
bool subrunasowner;
|
||||||
|
bool subfailover;
|
||||||
char *subconninfo;
|
char *subconninfo;
|
||||||
char *subslotname;
|
char *subslotname;
|
||||||
char *subsynccommit;
|
char *subsynccommit;
|
||||||
char *subpublications;
|
char *subpublications;
|
||||||
char *suborigin;
|
char *suborigin;
|
||||||
char *suboriginremotelsn;
|
char *suboriginremotelsn;
|
||||||
char *subfailover;
|
|
||||||
} SubscriptionInfo;
|
} SubscriptionInfo;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#include "catalog/pg_default_acl_d.h"
|
#include "catalog/pg_default_acl_d.h"
|
||||||
#include "catalog/pg_proc_d.h"
|
#include "catalog/pg_proc_d.h"
|
||||||
#include "catalog/pg_statistic_ext_d.h"
|
#include "catalog/pg_statistic_ext_d.h"
|
||||||
|
#include "catalog/pg_subscription_d.h"
|
||||||
#include "catalog/pg_type_d.h"
|
#include "catalog/pg_type_d.h"
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "common/logging.h"
|
#include "common/logging.h"
|
||||||
@ -6679,9 +6680,9 @@ describeSubscriptions(const char *pattern, bool verbose)
|
|||||||
if (pset.sversion >= 160000)
|
if (pset.sversion >= 160000)
|
||||||
appendPQExpBuffer(&buf,
|
appendPQExpBuffer(&buf,
|
||||||
", (CASE substream\n"
|
", (CASE substream\n"
|
||||||
" WHEN 'f' THEN 'off'\n"
|
" WHEN " CppAsString2(LOGICALREP_STREAM_OFF) " THEN 'off'\n"
|
||||||
" WHEN 't' THEN 'on'\n"
|
" WHEN " CppAsString2(LOGICALREP_STREAM_ON) " THEN 'on'\n"
|
||||||
" WHEN 'p' THEN 'parallel'\n"
|
" WHEN " CppAsString2(LOGICALREP_STREAM_PARALLEL) " THEN 'parallel'\n"
|
||||||
" END) AS \"%s\"\n",
|
" END) AS \"%s\"\n",
|
||||||
gettext_noop("Streaming"));
|
gettext_noop("Streaming"));
|
||||||
else
|
else
|
||||||
|
@ -23,26 +23,6 @@
|
|||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
/*
|
|
||||||
* two_phase tri-state values. See comments atop worker.c to know more about
|
|
||||||
* these states.
|
|
||||||
*/
|
|
||||||
#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
|
|
||||||
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
|
|
||||||
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The subscription will request the publisher to only send changes that do not
|
|
||||||
* have any origin.
|
|
||||||
*/
|
|
||||||
#define LOGICALREP_ORIGIN_NONE "none"
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The subscription will request the publisher to send changes regardless
|
|
||||||
* of their origin.
|
|
||||||
*/
|
|
||||||
#define LOGICALREP_ORIGIN_ANY "any"
|
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
* pg_subscription definition. cpp turns this into
|
* pg_subscription definition. cpp turns this into
|
||||||
* typedef struct FormData_pg_subscription
|
* typedef struct FormData_pg_subscription
|
||||||
@ -159,6 +139,28 @@ typedef struct Subscription
|
|||||||
* specified origin */
|
* specified origin */
|
||||||
} Subscription;
|
} Subscription;
|
||||||
|
|
||||||
|
#ifdef EXPOSE_TO_CLIENT_CODE
|
||||||
|
|
||||||
|
/*
|
||||||
|
* two_phase tri-state values. See comments atop worker.c to know more about
|
||||||
|
* these states.
|
||||||
|
*/
|
||||||
|
#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
|
||||||
|
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
|
||||||
|
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The subscription will request the publisher to only send changes that do not
|
||||||
|
* have any origin.
|
||||||
|
*/
|
||||||
|
#define LOGICALREP_ORIGIN_NONE "none"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The subscription will request the publisher to send changes regardless
|
||||||
|
* of their origin.
|
||||||
|
*/
|
||||||
|
#define LOGICALREP_ORIGIN_ANY "any"
|
||||||
|
|
||||||
/* Disallow streaming in-progress transactions. */
|
/* Disallow streaming in-progress transactions. */
|
||||||
#define LOGICALREP_STREAM_OFF 'f'
|
#define LOGICALREP_STREAM_OFF 'f'
|
||||||
|
|
||||||
@ -174,6 +176,8 @@ typedef struct Subscription
|
|||||||
*/
|
*/
|
||||||
#define LOGICALREP_STREAM_PARALLEL 'p'
|
#define LOGICALREP_STREAM_PARALLEL 'p'
|
||||||
|
|
||||||
|
#endif /* EXPOSE_TO_CLIENT_CODE */
|
||||||
|
|
||||||
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
|
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
|
||||||
extern void FreeSubscription(Subscription *sub);
|
extern void FreeSubscription(Subscription *sub);
|
||||||
extern void DisableSubscription(Oid subid);
|
extern void DisableSubscription(Oid subid);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user