diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index 2e9ef7c823b..49ffaeea2da 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp'); SELECT pg_replication_origin_drop('regress_test_decoding: temp'); ERROR: replication origin "regress_test_decoding: temp" does not exist +-- specifying reserved origin names is not supported +SELECT pg_replication_origin_create('any'); +ERROR: replication origin name "any" is reserved +DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. +SELECT pg_replication_origin_create('none'); +ERROR: replication origin name "none" is reserved +DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. +SELECT pg_replication_origin_create('pg_replication_origin'); +ERROR: replication origin name "pg_replication_origin" is reserved +DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. -- various failure checks for undefined slots select pg_replication_origin_advance('regress_test_decoding: temp', '0/1'); ERROR: replication origin "regress_test_decoding: temp" does not exist diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index 2e28a487773..db06541f565 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp'); SELECT pg_replication_origin_drop('regress_test_decoding: temp'); SELECT pg_replication_origin_drop('regress_test_decoding: temp'); +-- specifying reserved origin names is not supported +SELECT pg_replication_origin_create('any'); +SELECT pg_replication_origin_create('none'); +SELECT pg_replication_origin_create('pg_replication_origin'); + -- various failure checks for undefined slots select pg_replication_origin_advance('regress_test_decoding: temp', '0/1'); select pg_replication_origin_session_setup('regress_test_decoding: temp'); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 670a5406d61..a186e35f009 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<iteration count>:&l see . + + + + suborigin text + + + The origin value must be either none or + any. The default is any. + If none, the subscription will request the publisher + to only send changes that don't have an origin. If + any, the publisher sends changes regardless of their + origin. + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 353ea5def23..64efc21f537 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -207,8 +207,9 @@ ALTER SUBSCRIPTION name RENAME TO < information. The parameters that can be altered are slot_name, synchronous_commit, - binary, streaming, and - disable_on_error. + binary, streaming, + disable_on_error, and + origin. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 34b3264b261..7390c715bc3 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -302,6 +302,21 @@ CREATE SUBSCRIPTION subscription_name + + + origin (string) + + + Specifies whether the subscription will request the publisher to only + send changes that don't have an origin or send changes regardless of + origin. Setting origin to none + means that the subscription will request the publisher to only send + changes that don't have an origin. Setting origin + to any means that the publisher sends changes + regardless of their origin. The default is any. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8856ce3b501..33ae3da8aeb 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get origin */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_suborigin, + &isnull); + Assert(!isnull); + sub->origin = TextDatumGetCString(datum); + ReleaseSysCache(tup); return sub; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b9..f369b1fc141 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + subbinary, substream, subtwophasestate, subdisableonerr, + subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bdc12087241..bd0cc0848d7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,6 +64,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_ORIGIN 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -86,6 +87,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + char *origin; XLogRecPtr lsn; } SubOpts; @@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_COPY_DATA)); - /* Set default values for the boolean supported options. */ + /* Set default values for the supported options. */ if (IsSet(supported_opts, SUBOPT_CONNECT)) opts->connect = true; if (IsSet(supported_opts, SUBOPT_ENABLED)) @@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_ORIGIN)) + opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); /* Parse options */ foreach(lc, stmt_options) @@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ORIGIN) && + strcmp(defel->defname, "origin") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ORIGIN)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ORIGIN; + pfree(opts->origin); + + /* + * Even though the "origin" parameter allows only "none" and "any" + * values, it is implemented as a string type so that the + * parameter can be extended in future versions to support + * filtering using origin names specified by the user. + */ + opts->origin = defGetString(defel); + + if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) && + (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized origin value: \"%s\"", opts->origin)); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CStringGetTextDatum(opts.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) + { + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); + replaces[Anum_pg_subscription_suborigin - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0b775b1e985..da9c359af10 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.origin && + PQserverVersion(conn->streamConn) >= 160000) + appendStringInfo(&cmd, ", origin '%s'", + options->proto.logical.origin); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 21937ab2d31..c72ad6b93de 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -77,6 +77,7 @@ #include "access/xloginsert.h" #include "catalog/catalog.h" #include "catalog/indexing.h" +#include "catalog/pg_subscription.h" #include "funcapi.h" #include "miscadmin.h" #include "nodes/execnodes.h" @@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK) } +/* + * IsReservedOriginName + * True iff name is either "none" or "any". + */ +static bool +IsReservedOriginName(const char *name) +{ + return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) || + (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0)); +} + /* --------------------------------------------------------------------------- * Functions for working with replication origins themselves. * --------------------------------------------------------------------------- @@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS) name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); - /* Replication origins "pg_xxx" are reserved for internal use */ - if (IsReservedName(name)) + /* + * Replication origins "any and "none" are reserved for system options. + * The origins "pg_xxx" are reserved for internal use. + */ + if (IsReservedName(name) || IsReservedOriginName(name)) ereport(ERROR, (errcode(ERRCODE_RESERVED_NAME), errmsg("replication origin name \"%s\" is reserved", name), - errdetail("Origin names starting with \"pg_\" are reserved."))); + errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.", + LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE))); /* * If built with appropriate switch, whine when regression-testing diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 38e3b1c1b3c..5f8c5417630 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3077,6 +3077,7 @@ maybe_reread_subscription(void) strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || + strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) { @@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.origin = pstrdup(MySubscription->origin); if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ba8a24d0999..a3c1ba8a402 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -16,6 +16,7 @@ #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_subscription.h" #include "commands/defrem.h" #include "executor/executor.h" #include "fmgr.h" @@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; +static bool publish_no_origin; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool origin_option_given = false; data->binary = false; data->streaming = false; @@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "origin") == 0) + { + if (origin_option_given) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options")); + origin_option_given = true; + + data->origin = defGetString(defel); + if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) + publish_no_origin = true; + else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0) + publish_no_origin = false; + else + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized origin value: \"%s\"", data->origin)); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Currently we always forward. + * Return true if the data is associated with an origin and the user has + * requested the changes that don't have an origin, false otherwise. */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { + if (publish_no_origin && origin_id != InvalidRepOriginId) + return true; + return false; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e4fdb6b75b0..f9c51d1e679 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4412,6 +4412,7 @@ getSubscriptions(Archive *fout) int i_substream; int i_subtwophasestate; int i_subdisableonerr; + int i_suborigin; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4461,13 +4462,18 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n", LOGICALREP_TWOPHASE_STATE_DISABLED); + if (fout->remoteVersion >= 160000) + appendPQExpBufferStr(query, " s.suborigin\n"); + else + appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4493,6 +4499,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_suborigin = PQfnumber(res, "suborigin"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4522,6 +4529,7 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4595,6 +4603,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); + if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0) + appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1d21c2906f1..69ee939d449 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo char *substream; char *subtwophasestate; char *subdisableonerr; + char *suborigin; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 1f08716f690..b10e1c4c0d4 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2465,6 +2465,28 @@ my %tests = ( like => { %full_runs, section_post_data => 1, }, }, + 'CREATE SUBSCRIPTION sub2' => { + create_order => 50, + create_sql => 'CREATE SUBSCRIPTION sub2 + CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 + WITH (connect = false, origin = none);', + regexp => qr/^ + \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE SUBSCRIPTION sub3' => { + create_order => 50, + create_sql => 'CREATE SUBSCRIPTION sub3 + CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 + WITH (connect = false, origin = any);', + regexp => qr/^ + \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'ALTER PUBLICATION pub1 ADD TABLE test_table' => { create_order => 51, create_sql => diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 1938e1d6ec8..327a69487bb 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6469,7 +6469,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6511,6 +6511,11 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Two-phase commit"), gettext_noop("Disable on error")); + if (pset.sversion >= 160000) + appendPQExpBuffer(&buf, + ", suborigin AS \"%s\"\n", + gettext_noop("Origin")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index e572f585ef8..92207d2e160 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1873,7 +1873,8 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "slot_name", "streaming", "synchronous_commit"); + COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name", + "streaming", "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3152,8 +3153,8 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "slot_name", "streaming", - "synchronous_commit", "two_phase"); + "disable_on_error", "enabled", "origin", "slot_name", + "streaming", "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index c27fe0fcd81..90379e05cbc 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202207201 +#define CATALOG_VERSION_NO 202207211 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index d1260f590cf..c9a3026b283 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -31,6 +31,18 @@ #define LOGICALREP_TWOPHASE_STATE_PENDING 'p' #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' +/* + * The subscription will request the publisher to only send changes that do not + * have any origin. + */ +#define LOGICALREP_ORIGIN_NONE "none" + +/* + * The subscription will request the publisher to send changes regardless + * of their origin. + */ +#define LOGICALREP_ORIGIN_ANY "any" + /* ---------------- * pg_subscription definition. cpp turns this into * typedef struct FormData_pg_subscription @@ -87,6 +99,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* List of publications subscribed to */ text subpublications[1] BKI_FORCE_NOT_NULL; + + /* Only publish data originating from the specified origin */ + text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); #endif } FormData_pg_subscription; @@ -118,6 +133,8 @@ typedef struct Subscription char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ + char *origin; /* Only publish data originating from the + * specified origin */ } Subscription; extern Subscription *GetSubscription(Oid subid, bool missing_ok); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a5..02027550e25 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + char *origin; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 81184aa92f3..88d7cc6abcb 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,8 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + char *origin; /* Only publish data originating from the + * specified origin */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 5db7146e061..ef0ebf96b90 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +-- fail - origin must be either none or any +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); +ERROR: unrecognized origin value: "foo" +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | none | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 +(1 row) + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -96,10 +118,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -179,19 +201,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -202,19 +224,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -247,10 +269,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -284,10 +306,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -296,10 +318,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -323,18 +345,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | any | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 74c38ead5d6..4425fafc46c 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU ALTER SUBSCRIPTION regress_testsub3 ENABLE; ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; +-- fail - origin must be either none or any +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none); +\dRs+ regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); +\dRs+ regress_testsub4 + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl new file mode 100644 index 00000000000..e9241d29966 --- /dev/null +++ b/src/test/subscription/t/030_origin.pl @@ -0,0 +1,155 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test the CREATE SUBSCRIPTION 'origin' parameter. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->start; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +my $appname_B1 = 'tap_sub_B1'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B1 + CONNECTION '$node_A_connstr application_name=$appname_B1' + PUBLICATION tap_pub_A + WITH (origin = none)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +my $appname_A = 'tap_sub_A'; +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is(1, 1, 'Bidirectional replication setup is complete'); + +my $result; + +############################################################################### +# Check that bidirectional logical replication setup does not cause infinite +# recursive insertion. +############################################################################### + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);"); +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);"); + +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# check that transaction was committed on subscriber(s) +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is( $result, qq(11 +21), + 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is( $result, qq(11 +21), + 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); + +$node_A->safe_psql('postgres', "DELETE FROM tab;"); + +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +############################################################################### +# Check that remote data of node_B (that originated from node_C) is not +# published to node_A. +############################################################################### +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +# Initialize node node_C +my $node_C = PostgreSQL::Test::Cluster->new('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->start; + +$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); + +# Setup logical replication +# node_C (pub) -> node_B (sub) +my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; +$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab"); + +my $appname_B2 = 'tap_sub_B2'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B2 + CONNECTION '$node_C_connstr application_name=$appname_B2' + PUBLICATION tap_pub_C + WITH (origin = none)"); + +$node_C->wait_for_catchup($appname_B2); + +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert a record +$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); + +$node_C->wait_for_catchup($appname_B2); +$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($appname_B1); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(32), 'The node_C data replicated to node_B'); + +# check that the data published from node_C to node_B is not sent to node_A +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), + 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' +); + +# shutdown +$node_B->stop('fast'); +$node_A->stop('fast'); +$node_C->stop('fast'); + +done_testing();