diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 2520f6c50dd..87be1fb1c26 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -456,6 +456,56 @@ make prefix=/usr/local/pgsql.new install
+
+ Prepare for subscriber upgrades
+
+
+ Setup the
+ subscriber configurations in the new subscriber.
+ pg_upgrade attempts to migrate subscription
+ dependencies which includes the subscription's table information present in
+ pg_subscription_rel
+ system catalog and also the subscription's replication origin. This allows
+ logical replication on the new subscriber to continue from where the
+ old subscriber was up to. Migration of subscription dependencies is only
+ supported when the old cluster is version 17.0 or later. Subscription
+ dependencies on clusters before version 17.0 will silently be ignored.
+
+
+
+ There are some prerequisites for pg_upgrade to
+ be able to upgrade the subscriptions. If these are not met an error
+ will be reported.
+
+
+
+
+
+ All the subscription tables in the old subscriber should be in state
+ i (initialize) or r (ready). This
+ can be verified by checking pg_subscription_rel.srsubstate.
+
+
+
+
+ The replication origin entry corresponding to each of the subscriptions
+ should exist in the old cluster. This can be found by checking
+ pg_subscription and
+ pg_replication_origin
+ system tables.
+
+
+
+
+ The new cluster must have
+ max_replication_slots
+ configured to a value greater than or equal to the number of
+ subscriptions present in the old cluster.
+
+
+
+
+
Stop both servers
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f1362..7167377d82e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -228,10 +228,14 @@ textarray_to_stringlist(ArrayType *textarray)
/*
* Add new state record for a subscription table.
+ *
+ * If retain_lock is true, then don't release the locks taken in this function.
+ * We normally release the locks at the end of transaction but in binary-upgrade
+ * mode, we expect to release those immediately.
*/
void
AddSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn)
+ XLogRecPtr sublsn, bool retain_lock)
{
Relation rel;
HeapTuple tup;
@@ -269,7 +273,15 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
heap_freetuple(tup);
/* Cleanup. */
- table_close(rel, NoLock);
+ if (retain_lock)
+ {
+ table_close(rel, NoLock);
+ }
+ else
+ {
+ table_close(rel, RowExclusiveLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+ }
}
/*
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index edc82c11beb..dd067d39ade 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -773,7 +773,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
rv->schemaname, rv->relname);
AddSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, true);
}
/*
@@ -943,7 +943,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, true);
ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name)));
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 92921b0239d..14368aafbe0 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,15 +11,24 @@
#include "postgres.h"
+#include "access/relation.h"
+#include "access/table.h"
#include "catalog/binary_upgrade.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
+#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
+#include "storage/lmgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
#define CHECK_IS_BINARY_UPGRADE \
@@ -305,3 +314,100 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(!found_pending_wal);
}
+
+/*
+ * binary_upgrade_add_sub_rel_state
+ *
+ * Add the relation with the specified relation state to pg_subscription_rel
+ * catalog.
+ */
+Datum
+binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS)
+{
+ Relation subrel;
+ Relation rel;
+ Oid subid;
+ char *subname;
+ Oid relid;
+ char relstate;
+ XLogRecPtr sublsn;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /* We must check these things before dereferencing the arguments */
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
+ elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed");
+
+ subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ relid = PG_GETARG_OID(1);
+ relstate = PG_GETARG_CHAR(2);
+ sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3);
+
+ subrel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ subid = get_subscription_oid(subname, false);
+ rel = relation_open(relid, AccessShareLock);
+
+ /*
+ * Since there are no concurrent ALTER/DROP SUBSCRIPTION commands during
+ * the upgrade process, and the apply worker (which builds cache based on
+ * the subscription catalog) is not running, the locks can be released
+ * immediately.
+ */
+ AddSubscriptionRelState(subid, relid, relstate, sublsn, false);
+ relation_close(rel, AccessShareLock);
+ table_close(subrel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * binary_upgrade_replorigin_advance
+ *
+ * Update the remote_lsn for the subscriber's replication origin.
+ */
+Datum
+binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
+{
+ Relation rel;
+ Oid subid;
+ char *subname;
+ char originname[NAMEDATALEN];
+ RepOriginId node;
+ XLogRecPtr remote_commit;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /*
+ * We must ensure a non-NULL subscription name before dereferencing the
+ * arguments.
+ */
+ if (PG_ARGISNULL(0))
+ elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed");
+
+ subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ subid = get_subscription_oid(subname, false);
+
+ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
+
+ /* Lock to prevent the replication origin from vanishing */
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ node = replorigin_by_name(originname, false);
+
+ /*
+ * The server will be stopped after setting up the objects in the new
+ * cluster and the origins will be flushed during the shutdown checkpoint.
+ * This will ensure that the latest LSN values for origin will be
+ * available after the upgrade.
+ */
+ replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
+ false /* backward */ ,
+ false /* WAL log */ );
+
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 8b0c1e7b53e..764a39fcb95 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -24,6 +24,7 @@
#include "catalog/pg_operator_d.h"
#include "catalog/pg_proc_d.h"
#include "catalog/pg_publication_d.h"
+#include "catalog/pg_subscription_d.h"
#include "catalog/pg_type_d.h"
#include "common/hashfn.h"
#include "fe_utils/string_utils.h"
@@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
pg_log_info("reading subscriptions");
getSubscriptions(fout);
+ pg_log_info("reading subscription membership of tables");
+ getSubscriptionTables(fout);
+
free(inhinfo); /* not needed any longer */
*numTablesPtr = numTables;
@@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
return (PublicationInfo *) dobj;
}
+/*
+ * findSubscriptionByOid
+ * finds the DumpableObject for the subscription with the given oid
+ * returns NULL if not found
+ */
+SubscriptionInfo *
+findSubscriptionByOid(Oid oid)
+{
+ CatalogId catId;
+ DumpableObject *dobj;
+
+ catId.tableoid = SubscriptionRelationId;
+ catId.oid = oid;
+ dobj = findObjectByCatalogId(catId);
+ Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
+ return (SubscriptionInfo *) dobj;
+}
+
/*
* recordExtensionMembership
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 050a8312265..8973ec715c1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -297,6 +297,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
+static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
static void dumpDatabase(Archive *fout);
static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
const char *dbname, Oid dboid);
@@ -4638,6 +4639,8 @@ getSubscriptions(Archive *fout)
int i_subsynccommit;
int i_subpublications;
int i_suborigin;
+ int i_suboriginremotelsn;
+ int i_subenabled;
int i,
ntups;
@@ -4693,16 +4696,30 @@ getSubscriptions(Archive *fout)
appendPQExpBufferStr(query,
" s.subpasswordrequired,\n"
" s.subrunasowner,\n"
- " s.suborigin\n");
+ " s.suborigin,\n");
else
appendPQExpBuffer(query,
" 't' AS subpasswordrequired,\n"
" 't' AS subrunasowner,\n"
- " '%s' AS suborigin\n",
+ " '%s' AS suborigin,\n",
LOGICALREP_ORIGIN_ANY);
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
+ " s.subenabled\n");
+ else
+ appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
+ " false AS subenabled\n");
+
+ appendPQExpBufferStr(query,
+ "FROM pg_subscription s\n");
+
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ appendPQExpBufferStr(query,
+ "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
+ " ON o.external_id = 'pg_' || s.oid::text \n");
+
appendPQExpBufferStr(query,
- "FROM pg_subscription s\n"
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
" WHERE datname = current_database())");
@@ -4729,6 +4746,8 @@ getSubscriptions(Archive *fout)
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_suborigin = PQfnumber(res, "suborigin");
+ i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+ i_subenabled = PQfnumber(res, "subenabled");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -4766,6 +4785,13 @@ getSubscriptions(Archive *fout)
subinfo[i].subpublications =
pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+ if (PQgetisnull(res, i, i_suboriginremotelsn))
+ subinfo[i].suboriginremotelsn = NULL;
+ else
+ subinfo[i].suboriginremotelsn =
+ pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+ subinfo[i].subenabled =
+ pg_strdup(PQgetvalue(res, i, i_subenabled));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4775,6 +4801,162 @@ getSubscriptions(Archive *fout)
destroyPQExpBuffer(query);
}
+/*
+ * getSubscriptionTables
+ * Get information about subscription membership for dumpable tables. This
+ * will be used only in binary-upgrade mode for PG17 or later versions.
+ */
+void
+getSubscriptionTables(Archive *fout)
+{
+ DumpOptions *dopt = fout->dopt;
+ SubscriptionInfo *subinfo = NULL;
+ SubRelInfo *subrinfo;
+ PGresult *res;
+ int i_srsubid;
+ int i_srrelid;
+ int i_srsubstate;
+ int i_srsublsn;
+ int ntups;
+ Oid last_srsubid = InvalidOid;
+
+ if (dopt->no_subscriptions || !dopt->binary_upgrade ||
+ fout->remoteVersion < 170000)
+ return;
+
+ res = ExecuteSqlQuery(fout,
+ "SELECT srsubid, srrelid, srsubstate, srsublsn "
+ "FROM pg_catalog.pg_subscription_rel "
+ "ORDER BY srsubid",
+ PGRES_TUPLES_OK);
+ ntups = PQntuples(res);
+ if (ntups == 0)
+ goto cleanup;
+
+ /* Get pg_subscription_rel attributes */
+ i_srsubid = PQfnumber(res, "srsubid");
+ i_srrelid = PQfnumber(res, "srrelid");
+ i_srsubstate = PQfnumber(res, "srsubstate");
+ i_srsublsn = PQfnumber(res, "srsublsn");
+
+ subrinfo = pg_malloc(ntups * sizeof(SubRelInfo));
+ for (int i = 0; i < ntups; i++)
+ {
+ Oid cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
+ Oid relid = atooid(PQgetvalue(res, i, i_srrelid));
+ TableInfo *tblinfo;
+
+ /*
+ * If we switched to a new subscription, check if the subscription
+ * exists.
+ */
+ if (cur_srsubid != last_srsubid)
+ {
+ subinfo = findSubscriptionByOid(cur_srsubid);
+ if (subinfo == NULL)
+ pg_fatal("subscription with OID %u does not exist", cur_srsubid);
+
+ last_srsubid = cur_srsubid;
+ }
+
+ tblinfo = findTableByOid(relid);
+ if (tblinfo == NULL)
+ pg_fatal("failed sanity check, table with OID %u not found",
+ relid);
+
+ /* OK, make a DumpableObject for this relationship */
+ subrinfo[i].dobj.objType = DO_SUBSCRIPTION_REL;
+ subrinfo[i].dobj.catId.tableoid = relid;
+ subrinfo[i].dobj.catId.oid = cur_srsubid;
+ AssignDumpId(&subrinfo[i].dobj);
+ subrinfo[i].dobj.name = pg_strdup(subinfo->dobj.name);
+ subrinfo[i].tblinfo = tblinfo;
+ subrinfo[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+ if (PQgetisnull(res, i, i_srsublsn))
+ subrinfo[i].srsublsn = NULL;
+ else
+ subrinfo[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+
+ subrinfo[i].subinfo = subinfo;
+
+ /* Decide whether we want to dump it */
+ selectDumpableObject(&(subrinfo[i].dobj), fout);
+ }
+
+cleanup:
+ PQclear(res);
+}
+
+/*
+ * dumpSubscriptionTable
+ * Dump the definition of the given subscription table mapping. This will be
+ * used only in binary-upgrade mode for PG17 or later versions.
+ */
+static void
+dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ SubscriptionInfo *subinfo = subrinfo->subinfo;
+ PQExpBuffer query;
+ char *tag;
+
+ /* Do nothing in data-only dump */
+ if (dopt->dataOnly)
+ return;
+
+ Assert(fout->dopt->binary_upgrade && fout->remoteVersion >= 170000);
+
+ tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name);
+
+ query = createPQExpBuffer();
+
+ if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+ {
+ /*
+ * binary_upgrade_add_sub_rel_state will add the subscription relation
+ * to pg_subscription_rel table. This will be used only in
+ * binary-upgrade mode.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the subscriber table.\n");
+ appendPQExpBufferStr(query,
+ "SELECT pg_catalog.binary_upgrade_add_sub_rel_state(");
+ appendStringLiteralAH(query, subrinfo->dobj.name, fout);
+ appendPQExpBuffer(query,
+ ", %u, '%c'",
+ subrinfo->tblinfo->dobj.catId.oid,
+ subrinfo->srsubstate);
+
+ if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0')
+ appendPQExpBuffer(query, ", '%s'", subrinfo->srsublsn);
+ else
+ appendPQExpBuffer(query, ", NULL");
+
+ appendPQExpBufferStr(query, ");\n");
+ }
+
+ /*
+ * There is no point in creating a drop query as the drop is done by table
+ * drop. (If you think to change this, see also _printTocEntry().)
+ * Although this object doesn't really have ownership as such, set the
+ * owner field anyway to ensure that the command is run by the correct
+ * role at restore time.
+ */
+ if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+ ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId,
+ ARCHIVE_OPTS(.tag = tag,
+ .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name,
+ .owner = subinfo->rolname,
+ .description = "SUBSCRIPTION TABLE",
+ .section = SECTION_POST_DATA,
+ .createStmt = query->data));
+
+ /* These objects can't currently have comments or seclabels */
+
+ free(tag);
+ destroyPQExpBuffer(query);
+}
+
/*
* dumpSubscription
* dump the definition of the given subscription
@@ -4855,6 +5037,43 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ /*
+ * In binary-upgrade mode, we allow the replication to continue after the
+ * upgrade.
+ */
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ {
+ if (subinfo->suboriginremotelsn)
+ {
+ /*
+ * Preserve the remote_lsn for the subscriber's replication
+ * origin. This value is required to start the replication from
+ * the position before the upgrade. This value will be stale if
+ * the publisher gets upgraded before the subscriber node.
+ * However, this shouldn't be a problem as the upgrade of the
+ * publisher ensures that all the transactions were replicated
+ * before upgrading it.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n");
+ appendPQExpBufferStr(query,
+ "SELECT pg_catalog.binary_upgrade_replorigin_advance(");
+ appendStringLiteralAH(query, subinfo->dobj.name, fout);
+ appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
+ }
+
+ if (strcmp(subinfo->subenabled, "t") == 0)
+ {
+ /*
+ * Enable the subscription to allow the replication to continue
+ * after the upgrade.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the subscriber's running state.\n");
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname);
+ }
+ }
+
if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = subinfo->dobj.name,
@@ -10477,6 +10696,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_SUBSCRIPTION:
dumpSubscription(fout, (const SubscriptionInfo *) dobj);
break;
+ case DO_SUBSCRIPTION_REL:
+ dumpSubscriptionTable(fout, (const SubRelInfo *) dobj);
+ break;
case DO_PRE_DATA_BOUNDARY:
case DO_POST_DATA_BOUNDARY:
/* never dumped, nothing to do */
@@ -18543,6 +18765,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
case DO_PUBLICATION_REL:
case DO_PUBLICATION_TABLE_IN_SCHEMA:
case DO_SUBSCRIPTION:
+ case DO_SUBSCRIPTION_REL:
/* Post-data objects: must come after the post-data boundary */
addObjectDependency(dobj, postDataBound->dumpId);
break;
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 673ca5c92d1..cf8d14c38f0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -83,6 +83,7 @@ typedef enum
DO_PUBLICATION_REL,
DO_PUBLICATION_TABLE_IN_SCHEMA,
DO_SUBSCRIPTION,
+ DO_SUBSCRIPTION_REL, /* see note for SubRelInfo */
} DumpableObjectType;
/*
@@ -661,6 +662,7 @@ typedef struct _SubscriptionInfo
{
DumpableObject dobj;
const char *rolname;
+ char *subenabled;
char *subbinary;
char *substream;
char *subtwophasestate;
@@ -672,8 +674,28 @@ typedef struct _SubscriptionInfo
char *subsynccommit;
char *subpublications;
char *suborigin;
+ char *suboriginremotelsn;
} SubscriptionInfo;
+/*
+ * The SubRelInfo struct is used to represent a subscription relation.
+ *
+ * XXX Currently, the subscription tables are added to the subscription after
+ * enabling the subscription in binary-upgrade mode. As the apply workers will
+ * not be started in binary_upgrade mode the ordering of enable subscription
+ * does not matter. The order of adding the subscription tables to the
+ * subscription and enabling the subscription should be taken care of if this
+ * feature will be supported in a non-binary-upgrade mode in the future.
+ */
+typedef struct _SubRelInfo
+{
+ DumpableObject dobj;
+ SubscriptionInfo *subinfo;
+ TableInfo *tblinfo;
+ char srsubstate;
+ char *srsublsn;
+} SubRelInfo;
+
/*
* common utility functions
*/
@@ -698,6 +720,7 @@ extern CollInfo *findCollationByOid(Oid oid);
extern NamespaceInfo *findNamespaceByOid(Oid oid);
extern ExtensionInfo *findExtensionByOid(Oid oid);
extern PublicationInfo *findPublicationByOid(Oid oid);
+extern SubscriptionInfo *findSubscriptionByOid(Oid oid);
extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext);
extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
@@ -757,5 +780,6 @@ extern void getPublicationNamespaces(Archive *fout);
extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
int numTables);
extern void getSubscriptions(Archive *fout);
+extern void getSubscriptionTables(Archive *fout);
#endif /* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index abfea15c09c..e8d9c8ac861 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -94,6 +94,7 @@ enum dbObjectTypePriorities
PRIO_PUBLICATION_REL,
PRIO_PUBLICATION_TABLE_IN_SCHEMA,
PRIO_SUBSCRIPTION,
+ PRIO_SUBSCRIPTION_REL,
PRIO_DEFAULT_ACL, /* done in ACL pass */
PRIO_EVENT_TRIGGER, /* must be next to last! */
PRIO_REFRESH_MATVIEW /* must be last! */
@@ -147,10 +148,11 @@ static const int dbObjectTypePriority[] =
PRIO_PUBLICATION, /* DO_PUBLICATION */
PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */
PRIO_PUBLICATION_TABLE_IN_SCHEMA, /* DO_PUBLICATION_TABLE_IN_SCHEMA */
- PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */
+ PRIO_SUBSCRIPTION, /* DO_SUBSCRIPTION */
+ PRIO_SUBSCRIPTION_REL /* DO_SUBSCRIPTION_REL */
};
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION_REL + 1),
"array length mismatch");
static DumpId preDataBoundId;
@@ -1472,6 +1474,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
"SUBSCRIPTION (ID %d OID %u)",
obj->dumpId, obj->catId.oid);
return;
+ case DO_SUBSCRIPTION_REL:
+ snprintf(buf, bufsize,
+ "SUBSCRIPTION TABLE (ID %d OID %u)",
+ obj->dumpId, obj->catId.oid);
+ return;
case DO_PRE_DATA_BOUNDARY:
snprintf(buf, bufsize,
"PRE-DATA BOUNDARY (ID %d)",
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index fa52aa2c220..87c06628c6b 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -34,7 +34,9 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
static void check_for_new_tablespace_dir(void);
static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
static void check_new_cluster_logical_replication_slots(void);
+static void check_new_cluster_subscription_configuration(void);
static void check_old_cluster_for_valid_slots(bool live_check);
+static void check_old_cluster_subscription_state(void);
/*
@@ -112,13 +114,21 @@ check_and_dump_old_cluster(bool live_check)
check_for_reg_data_type_usage(&old_cluster);
check_for_isn_and_int8_passing_mismatch(&old_cluster);
- /*
- * Logical replication slots can be migrated since PG17. See comments atop
- * get_old_cluster_logical_slot_infos().
- */
if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+ {
+ /*
+ * Logical replication slots can be migrated since PG17. See comments
+ * atop get_old_cluster_logical_slot_infos().
+ */
check_old_cluster_for_valid_slots(live_check);
+ /*
+ * Subscriptions and their dependencies can be migrated since PG17.
+ * See comments atop get_db_subscription_count().
+ */
+ check_old_cluster_subscription_state();
+ }
+
/*
* PG 16 increased the size of the 'aclitem' type, which breaks the
* on-disk format for existing data.
@@ -237,6 +247,8 @@ check_new_cluster(void)
check_for_new_tablespace_dir();
check_new_cluster_logical_replication_slots();
+
+ check_new_cluster_subscription_configuration();
}
@@ -1538,6 +1550,53 @@ check_new_cluster_logical_replication_slots(void)
check_ok();
}
+/*
+ * check_new_cluster_subscription_configuration()
+ *
+ * Verify that the max_replication_slots configuration specified is enough for
+ * creating the subscriptions. This is required to create the replication
+ * origin for each subscription.
+ */
+static void
+check_new_cluster_subscription_configuration(void)
+{
+ PGresult *res;
+ PGconn *conn;
+ int nsubs_on_old;
+ int max_replication_slots;
+
+ /* Subscriptions and their dependencies can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
+ return;
+
+ nsubs_on_old = count_old_cluster_subscriptions();
+
+ /* Quick return if there are no subscriptions to be migrated. */
+ if (nsubs_on_old == 0)
+ return;
+
+ prep_status("Checking for new cluster configuration for subscriptions");
+
+ conn = connectToServer(&new_cluster, "template1");
+
+ res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings "
+ "WHERE name = 'max_replication_slots';");
+
+ if (PQntuples(res) != 1)
+ pg_fatal("could not determine parameter settings on new cluster");
+
+ max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+ if (nsubs_on_old > max_replication_slots)
+ pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+ "subscriptions (%d) on the old cluster",
+ max_replication_slots, nsubs_on_old);
+
+ PQclear(res);
+ PQfinish(conn);
+
+ check_ok();
+}
+
/*
* check_old_cluster_for_valid_slots()
*
@@ -1613,3 +1672,129 @@ check_old_cluster_for_valid_slots(bool live_check)
check_ok();
}
+
+/*
+ * check_old_cluster_subscription_state()
+ *
+ * Verify that the replication origin corresponding to each of the
+ * subscriptions are present and each of the subscribed tables is in
+ * 'i' (initialize) or 'r' (ready) state.
+ */
+static void
+check_old_cluster_subscription_state(void)
+{
+ FILE *script = NULL;
+ char output_path[MAXPGPATH];
+ int ntup;
+
+ prep_status("Checking for subscription state");
+
+ snprintf(output_path, sizeof(output_path), "%s/%s",
+ log_opts.basedir,
+ "subs_invalid.txt");
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ PGresult *res;
+ DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
+ PGconn *conn = connectToServer(&old_cluster, active_db->db_name);
+
+ /* We need to check for pg_replication_origin only once. */
+ if (dbnum == 0)
+ {
+ /*
+ * Check that all the subscriptions have their respective
+ * replication origin.
+ */
+ res = executeQueryOrDie(conn,
+ "SELECT d.datname, s.subname "
+ "FROM pg_catalog.pg_subscription s "
+ "LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
+ " ON o.roname = 'pg_' || s.oid "
+ "INNER JOIN pg_catalog.pg_database d "
+ " ON d.oid = s.subdbid "
+ "WHERE o.roname iS NULL;");
+
+ ntup = PQntuples(res);
+ for (int i = 0; i < ntup; i++)
+ {
+ if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+ fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
+ PQgetvalue(res, i, 0),
+ PQgetvalue(res, i, 1));
+ }
+ PQclear(res);
+ }
+
+ /*
+ * We don't allow upgrade if there is a risk of dangling slot or
+ * origin corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state,
+ * while 'r' (ready) state refers to a slot/origin created previously
+ * but already dropped. These states are supported for pg_upgrade. The
+ * other states listed below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
+ * would retain a replication slot, which could not be dropped by the
+ * sync worker spawned after the upgrade because the subscription ID
+ * used for the slot name won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
+ * would retain the replication origin when there is a failure in
+ * tablesync worker immediately after dropping the replication slot in
+ * the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
+ * a relation upgraded while in this state would expect an origin ID
+ * with the OID of the subscription used before the upgrade, causing
+ * it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
+ * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
+ * so we need not allow these states.
+ */
+ res = executeQueryOrDie(conn,
+ "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+ "FROM pg_catalog.pg_subscription_rel r "
+ "LEFT JOIN pg_catalog.pg_subscription s"
+ " ON r.srsubid = s.oid "
+ "LEFT JOIN pg_catalog.pg_class c"
+ " ON r.srrelid = c.oid "
+ "LEFT JOIN pg_catalog.pg_namespace n"
+ " ON c.relnamespace = n.oid "
+ "WHERE r.srsubstate NOT IN ('i', 'r') "
+ "ORDER BY s.subname");
+
+ ntup = PQntuples(res);
+ for (int i = 0; i < ntup; i++)
+ {
+ if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %s",
+ output_path, strerror(errno));
+
+ fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
+ PQgetvalue(res, i, 0),
+ active_db->db_name,
+ PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 2),
+ PQgetvalue(res, i, 3));
+ }
+
+ PQclear(res);
+ PQfinish(conn);
+ }
+
+ if (script)
+ {
+ fclose(script);
+ pg_log(PG_REPORT, "fatal");
+ pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
+ "You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
+ "A list of the problematic subscriptions is in the file:\n"
+ " %s", output_path);
+ }
+ else
+ check_ok();
+}
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 4878aa22bfd..f70742851c4 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -28,6 +28,7 @@ static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
+static void get_db_subscription_count(DbInfo *dbinfo);
/*
@@ -293,10 +294,14 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
get_rel_infos(cluster, pDbInfo);
/*
- * Retrieve the logical replication slots infos for the old cluster.
+ * Retrieve the logical replication slots infos and the subscriptions
+ * count for the old cluster.
*/
if (cluster == &old_cluster)
+ {
get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+ get_db_subscription_count(pDbInfo);
+ }
}
if (cluster == &old_cluster)
@@ -730,6 +735,55 @@ count_old_cluster_logical_slots(void)
return slot_count;
}
+/*
+ * get_db_subscription_count()
+ *
+ * Gets the number of subscriptions in the database referred to by "dbinfo".
+ *
+ * Note: This function will not do anything if the old cluster is pre-PG17.
+ * This is because before that the logical slots are not upgraded, so we will
+ * not be able to upgrade the logical replication clusters completely.
+ */
+static void
+get_db_subscription_count(DbInfo *dbinfo)
+{
+ PGconn *conn;
+ PGresult *res;
+
+ /* Subscriptions can be migrated since PG17. */
+ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
+ return;
+
+ conn = connectToServer(&old_cluster, dbinfo->db_name);
+ res = executeQueryOrDie(conn, "SELECT count(*) "
+ "FROM pg_catalog.pg_subscription WHERE subdbid = %d",
+ dbinfo->db_oid);
+ dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
+
+ PQclear(res);
+ PQfinish(conn);
+}
+
+/*
+ * count_old_cluster_subscriptions()
+ *
+ * Returns the number of subscriptions for all databases.
+ *
+ * Note: this function always returns 0 if the old_cluster is PG16 and prior
+ * because we gather subscriptions only for cluster versions greater than or
+ * equal to PG17. See get_db_subscription_count().
+ */
+int
+count_old_cluster_subscriptions(void)
+{
+ int nsubs = 0;
+
+ for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ nsubs += old_cluster.dbarr.dbs[dbnum].nsubs;
+
+ return nsubs;
+}
+
static void
free_db_and_rel_infos(DbInfoArr *db_arr)
{
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 3e8a08e0623..32f12f9e273 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -43,6 +43,7 @@ tests += {
't/001_basic.pl',
't/002_pg_upgrade.pl',
't/003_logical_slots.pl',
+ 't/004_subscription.pl',
],
'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
},
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index a710f325ded..d63f13fffcc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -195,6 +195,7 @@ typedef struct
* path */
RelInfoArr rel_arr; /* array of all user relinfos */
LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */
+ int nsubs; /* number of subscriptions */
} DbInfo;
/*
@@ -421,6 +422,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
const char *new_pgdata);
void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
int count_old_cluster_logical_slots(void);
+int count_old_cluster_subscriptions(void);
/* option.c */
diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl
new file mode 100644
index 00000000000..d08ffffe108
--- /dev/null
+++ b/src/bin/pg_upgrade/t/004_subscription.pl
@@ -0,0 +1,319 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test for pg_upgrade of logical subscription
+use strict;
+use warnings;
+
+use File::Find qw(find);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+# Initialize the old subscriber node
+my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
+$old_sub->init;
+$old_sub->start;
+my $oldbindir = $old_sub->config_data('--bindir');
+
+# Initialize the new subscriber
+my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
+$new_sub->init;
+my $newbindir = $new_sub->config_data('--bindir');
+
+# In a VPATH build, we'll be started in the source directory, but we want
+# to run pg_upgrade in the build directory so that any files generated finish
+# in it, like delete_old_cluster.{sh,bat}.
+chdir ${PostgreSQL::Test::Utils::tmp_check};
+
+# Initial setup
+$publisher->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_upgraded1(id int);
+ CREATE TABLE tab_upgraded2(id int);
+]);
+$old_sub->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_upgraded1(id int);
+ CREATE TABLE tab_upgraded2(id int);
+]);
+
+# Setup logical replication
+my $connstr = $publisher->connstr . ' dbname=postgres';
+
+# Setup an enabled subscription to verify that the running status is retained
+# after upgrade.
+$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1");
+$old_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1"
+);
+$old_sub->wait_for_subscription_sync($publisher, 'regress_sub1');
+
+# Verify that the upgrade should be successful with tables in 'ready'/'init'
+# state along with retaining the replication origin's remote lsn, and
+# subscription's running status.
+$publisher->safe_psql('postgres',
+ "CREATE PUBLICATION regress_pub2 FOR TABLE tab_upgraded1");
+$old_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_sub2 CONNECTION '$connstr' PUBLICATION regress_pub2"
+);
+# Wait till the table tab_upgraded1 reaches 'ready' state
+my $synced_query =
+ "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'r'";
+$old_sub->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for the table to reach ready state";
+
+$publisher->safe_psql('postgres',
+ "INSERT INTO tab_upgraded1 VALUES (generate_series(1,50))");
+$publisher->wait_for_catchup('regress_sub2');
+
+# Change configuration to prepare a subscription table in init state
+$old_sub->append_conf('postgresql.conf',
+ "max_logical_replication_workers = 0");
+$old_sub->restart;
+
+$publisher->safe_psql('postgres',
+ "ALTER PUBLICATION regress_pub2 ADD TABLE tab_upgraded2");
+$old_sub->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_sub2 REFRESH PUBLICATION");
+
+# The table tab_upgraded2 will be in init state as the subscriber
+# configuration for max_logical_replication_workers is set to 0.
+my $result = $old_sub->safe_psql('postgres',
+ "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'i'");
+is($result, qq(t), "Check that the table is in init state");
+
+# Get the replication origin's remote_lsn of the old subscriber
+my $remote_lsn = $old_sub->safe_psql('postgres',
+ "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub2'"
+);
+# Have the subscription in disabled state before upgrade
+$old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 DISABLE");
+
+my $tab_upgraded1_oid = $old_sub->safe_psql('postgres',
+ "SELECT oid FROM pg_class WHERE relname = 'tab_upgraded1'");
+my $tab_upgraded2_oid = $old_sub->safe_psql('postgres',
+ "SELECT oid FROM pg_class WHERE relname = 'tab_upgraded2'");
+
+$old_sub->stop;
+
+# ------------------------------------------------------
+# Check that pg_upgrade is successful when all tables are in ready or in
+# init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is
+# in init state) along with retaining the replication origin's remote lsn
+# and subscription's running status.
+# ------------------------------------------------------
+command_ok(
+ [
+ 'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir,
+ '-D', $new_sub->data_dir, '-b', $oldbindir,
+ '-B', $newbindir, '-s', $new_sub->host,
+ '-p', $old_sub->port, '-P', $new_sub->port,
+ $mode
+ ],
+ 'run of pg_upgrade for old instance when the subscription tables are in init/ready state'
+);
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+ "pg_upgrade_output.d/ removed after successful pg_upgrade");
+
+# ------------------------------------------------------
+# Check that the data inserted to the publisher when the new subscriber is down
+# will be replicated once it is started. Also check that the old subscription
+# states and relations origins are all preserved.
+# ------------------------------------------------------
+$publisher->safe_psql(
+ 'postgres', qq[
+ INSERT INTO tab_upgraded1 VALUES(51);
+ INSERT INTO tab_upgraded2 VALUES(1);
+]);
+
+$new_sub->start;
+
+# The subscription's running status should be preserved. Old subscription
+# regress_sub1 should be enabled and old subscription regress_sub2 should be
+# disabled.
+$result =
+ $new_sub->safe_psql('postgres',
+ "SELECT subname, subenabled FROM pg_subscription ORDER BY subname");
+is( $result, qq(regress_sub1|t
+regress_sub2|f),
+ "check that the subscription's running status are preserved");
+
+my $sub_oid = $new_sub->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub2'");
+
+# Subscription relations should be preserved
+$result = $new_sub->safe_psql('postgres',
+ "SELECT srrelid, srsubstate FROM pg_subscription_rel WHERE srsubid = $sub_oid ORDER BY srrelid"
+);
+is( $result, qq($tab_upgraded1_oid|r
+$tab_upgraded2_oid|i),
+ "there should be 2 rows in pg_subscription_rel(representing tab_upgraded1 and tab_upgraded2)"
+);
+
+# The replication origin's remote_lsn should be preserved
+$result = $new_sub->safe_psql('postgres',
+ "SELECT remote_lsn FROM pg_replication_origin_status WHERE external_id = 'pg_' || $sub_oid"
+);
+is($result, qq($remote_lsn), "remote_lsn should have been preserved");
+
+# Enable the subscription
+$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 ENABLE");
+
+# Wait until all tables of subscription 'regress_sub2' are synchronized
+$new_sub->wait_for_subscription_sync($publisher, 'regress_sub2');
+
+# Rows on tab_upgraded1 and tab_upgraded2 should have been replicated
+$result =
+ $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded1");
+is($result, qq(51), "check replicated inserts on new subscriber");
+$result =
+ $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded2");
+is($result, qq(1),
+ "check the data is synced after enabling the subscription for the table that was in init state"
+);
+
+# cleanup
+$new_sub->stop;
+$old_sub->append_conf('postgresql.conf',
+ "max_logical_replication_workers = 4");
+$old_sub->start;
+$old_sub->safe_psql(
+ 'postgres', qq[
+ ALTER SUBSCRIPTION regress_sub1 DISABLE;
+ ALTER SUBSCRIPTION regress_sub1 SET (slot_name = none);
+ DROP SUBSCRIPTION regress_sub1;
+]);
+$old_sub->stop;
+
+# ------------------------------------------------------
+# Check that pg_upgrade fails when max_replication_slots configured in the new
+# cluster is less than the number of subscriptions in the old cluster.
+# ------------------------------------------------------
+my $new_sub1 = PostgreSQL::Test::Cluster->new('new_sub1');
+$new_sub1->init;
+$new_sub1->append_conf('postgresql.conf', "max_replication_slots = 0");
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots.
+command_checks_all(
+ [
+ 'pg_upgrade', '--no-sync',
+ '-d', $old_sub->data_dir,
+ '-D', $new_sub1->data_dir,
+ '-b', $oldbindir,
+ '-B', $newbindir,
+ '-s', $new_sub1->host,
+ '-p', $old_sub->port,
+ '-P', $new_sub1->port,
+ $mode, '--check',
+ ],
+ 1,
+ [
+ qr/max_replication_slots \(0\) must be greater than or equal to the number of subscriptions \(1\) on the old cluster/
+ ],
+ [qr//],
+ 'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+
+# Reset max_replication_slots
+$new_sub1->append_conf('postgresql.conf', "max_replication_slots = 10");
+
+# Drop the subscription
+$old_sub->start;
+$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub2");
+
+# ------------------------------------------------------
+# Check that pg_upgrade refuses to run if:
+# a) there's a subscription with tables in a state other than 'r' (ready) or
+# 'i' (init) and/or
+# b) the subscription has no replication origin.
+# ------------------------------------------------------
+$publisher->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_primary_key(id serial PRIMARY KEY);
+ INSERT INTO tab_primary_key values(1);
+ CREATE PUBLICATION regress_pub3 FOR TABLE tab_primary_key;
+]);
+
+# Insert the same value that is already present in publisher to the primary key
+# column of subscriber so that the table sync will fail.
+$old_sub->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_primary_key(id serial PRIMARY KEY);
+ INSERT INTO tab_primary_key values(1);
+ CREATE SUBSCRIPTION regress_sub3 CONNECTION '$connstr' PUBLICATION regress_pub3;
+]);
+
+# Table will be in 'd' (data is being copied) state as table sync will fail
+# because of primary key constraint error.
+my $started_query =
+ "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd'";
+$old_sub->poll_query_until('postgres', $started_query)
+ or die
+ "Timed out while waiting for the table state to become 'd' (datasync)";
+
+# Create another subscription and drop the subscription's replication origin
+$old_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub3 WITH (enabled = false)"
+);
+$sub_oid = $old_sub->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub4'");
+my $reporigin = 'pg_' . qq($sub_oid);
+$old_sub->safe_psql('postgres',
+ "SELECT pg_replication_origin_drop('$reporigin')");
+
+$old_sub->stop;
+
+command_fails(
+ [
+ 'pg_upgrade', '--no-sync',
+ '-d', $old_sub->data_dir,
+ '-D', $new_sub1->data_dir,
+ '-b', $oldbindir,
+ '-B', $newbindir,
+ '-s', $new_sub1->host,
+ '-p', $old_sub->port,
+ '-P', $new_sub1->port,
+ $mode, '--check',
+ ],
+ 'run of pg_upgrade --check for old instance with relation in \'d\' datasync(invalid) state and missing replication origin'
+);
+
+# Verify the reason why the subscriber cannot be upgraded
+my $sub_relstate_filename;
+
+# Find a txt file that contains a list of tables that cannot be upgraded. We
+# cannot predict the file's path because the output directory contains a
+# milliseconds timestamp. File::Find::find must be used.
+find(
+ sub {
+ if ($File::Find::name =~ m/subs_invalid\.txt/)
+ {
+ $sub_relstate_filename = $File::Find::name;
+ }
+ },
+ $new_sub1->data_dir . "/pg_upgrade_output.d");
+
+# Check the file content which should have tab_primary_key table in invalid
+# state.
+like(
+ slurp_file($sub_relstate_filename),
+ qr/The table sync state \"d\" is not allowed for database:\"postgres\" subscription:\"regress_sub3\" schema:\"public\" relation:\"tab_primary_key\"/m,
+ 'the previous test failed due to subscription table in invalid state');
+
+# Check the file content which should have regress_sub4 subscription.
+like(
+ slurp_file($sub_relstate_filename),
+ qr/The replication origin is missing for database:\"postgres\" subscription:\"regress_sub4\"/m,
+ 'the previous test failed due to missing replication origin');
+
+done_testing();
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index dad0056f796..b3fdbd9066c 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202312271
+#define CATALOG_VERSION_NO 202401021
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9052f5262a2..5b67784731a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11410,6 +11410,16 @@
proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v',
proparallel => 'u', prorettype => 'bool', proargtypes => 'name',
prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
+{ oid => '8404', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)',
+ proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f',
+ provolatile => 'v', proparallel => 'u', prorettype => 'void',
+ proargtypes => 'text oid char pg_lsn',
+ prosrc => 'binary_upgrade_add_sub_rel_state' },
+{ oid => '8405', descr => 'for use by pg_upgrade (remote_lsn for origin)',
+ proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f',
+ provolatile => 'v', proparallel => 'u', prorettype => 'void',
+ proargtypes => 'text pg_lsn',
+ prosrc => 'binary_upgrade_replorigin_advance' },
# conversion functions
{ oid => '4302',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f5324b710d6..34ec3117a3e 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -81,7 +81,7 @@ typedef struct SubscriptionRelState
} SubscriptionRelState;
extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn);
+ XLogRecPtr sublsn, bool retain_lock);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ee2ad7aa455..f2ea7edac5e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2669,6 +2669,7 @@ SubLinkType
SubOpts
SubPlan
SubPlanState
+SubRelInfo
SubRemoveRels
SubTransactionId
SubXactCallback