mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Allow upgrades to preserve the full subscription's state.
This feature will allow us to replicate the changes on subscriber nodes after the upgrade. Previously, only the subscription metadata information was preserved. Without the list of relations and their state, it's not possible to re-enable the subscriptions without missing some records as the list of relations can only be refreshed after enabling the subscription (and therefore starting the apply worker). Even if we added a way to refresh the subscription while enabling a publication, we still wouldn't know which relations are new on the publication side, and therefore should be fully synced, and which shouldn't. To preserve the subscription relations, this patch teaches pg_dump to restore the content of pg_subscription_rel from the old cluster by using binary_upgrade_add_sub_rel_state SQL function. This is supported only in binary upgrade mode. The subscription's replication origin is needed to ensure that we don't replicate anything twice. To preserve the replication origins, this patch teaches pg_dump to update the replication origin along with creating a subscription by using binary_upgrade_replorigin_advance SQL function to restore the underlying replication origin remote LSN. This is supported only in binary upgrade mode. pg_upgrade will check that all the subscription relations are in 'i' (init) or in 'r' (ready) state and will error out if that's not the case, logging the reason for the failure. This helps to avoid the risk of any dangling slot or origin after the upgrade. Author: Vignesh C, Julien Rouhaud, Shlok Kyal Reviewed-by: Peter Smith, Masahiko Sawada, Michael Paquier, Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
This commit is contained in:
		| @@ -456,6 +456,56 @@ make prefix=/usr/local/pgsql.new install | ||||
|  | ||||
|    </step> | ||||
|  | ||||
|    <step> | ||||
|     <title>Prepare for subscriber upgrades</title> | ||||
|  | ||||
|     <para> | ||||
|      Setup the <link linkend="logical-replication-config-subscriber"> | ||||
|      subscriber configurations</link> in the new subscriber. | ||||
|      <application>pg_upgrade</application> attempts to migrate subscription | ||||
|      dependencies which includes the subscription's table information present in | ||||
|      <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link> | ||||
|      system catalog and also the subscription's replication origin. This allows | ||||
|      logical replication on the new subscriber to continue from where the | ||||
|      old subscriber was up to. Migration of subscription dependencies is only | ||||
|      supported when the old cluster is version 17.0 or later. Subscription | ||||
|      dependencies on clusters before version 17.0 will silently be ignored. | ||||
|     </para> | ||||
|  | ||||
|     <para> | ||||
|      There are some prerequisites for <application>pg_upgrade</application> to | ||||
|      be able to upgrade the subscriptions. If these are not met an error | ||||
|      will be reported. | ||||
|     </para> | ||||
|  | ||||
|     <itemizedlist> | ||||
|      <listitem> | ||||
|       <para> | ||||
|        All the subscription tables in the old subscriber should be in state | ||||
|        <literal>i</literal> (initialize) or <literal>r</literal> (ready). This | ||||
|        can be verified by checking <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsubstate</structfield>. | ||||
|       </para> | ||||
|      </listitem> | ||||
|      <listitem> | ||||
|       <para> | ||||
|        The replication origin entry corresponding to each of the subscriptions | ||||
|        should exist in the old cluster. This can be found by checking | ||||
|        <link linkend="catalog-pg-subscription">pg_subscription</link> and | ||||
|        <link linkend="catalog-pg-replication-origin">pg_replication_origin</link> | ||||
|        system tables. | ||||
|       </para> | ||||
|      </listitem> | ||||
|      <listitem> | ||||
|       <para> | ||||
|        The new cluster must have | ||||
|        <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link> | ||||
|        configured to a value greater than or equal to the number of | ||||
|        subscriptions present in the old cluster. | ||||
|       </para> | ||||
|      </listitem> | ||||
|     </itemizedlist> | ||||
|    </step> | ||||
|  | ||||
|    <step> | ||||
|     <title>Stop both servers</title> | ||||
|  | ||||
|   | ||||
| @@ -228,10 +228,14 @@ textarray_to_stringlist(ArrayType *textarray) | ||||
|  | ||||
| /* | ||||
|  * Add new state record for a subscription table. | ||||
|  * | ||||
|  * If retain_lock is true, then don't release the locks taken in this function. | ||||
|  * We normally release the locks at the end of transaction but in binary-upgrade | ||||
|  * mode, we expect to release those immediately. | ||||
|  */ | ||||
| void | ||||
| AddSubscriptionRelState(Oid subid, Oid relid, char state, | ||||
| 						XLogRecPtr sublsn) | ||||
| 						XLogRecPtr sublsn, bool retain_lock) | ||||
| { | ||||
| 	Relation	rel; | ||||
| 	HeapTuple	tup; | ||||
| @@ -269,7 +273,15 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, | ||||
| 	heap_freetuple(tup); | ||||
|  | ||||
| 	/* Cleanup. */ | ||||
| 	table_close(rel, NoLock); | ||||
| 	if (retain_lock) | ||||
| 	{ | ||||
| 		table_close(rel, NoLock); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		table_close(rel, RowExclusiveLock); | ||||
| 		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /* | ||||
|   | ||||
| @@ -773,7 +773,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, | ||||
| 										 rv->schemaname, rv->relname); | ||||
|  | ||||
| 				AddSubscriptionRelState(subid, relid, table_state, | ||||
| 										InvalidXLogRecPtr); | ||||
| 										InvalidXLogRecPtr, true); | ||||
| 			} | ||||
|  | ||||
| 			/* | ||||
| @@ -943,7 +943,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, | ||||
| 			{ | ||||
| 				AddSubscriptionRelState(sub->oid, relid, | ||||
| 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, | ||||
| 										InvalidXLogRecPtr); | ||||
| 										InvalidXLogRecPtr, true); | ||||
| 				ereport(DEBUG1, | ||||
| 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", | ||||
| 										 rv->schemaname, rv->relname, sub->name))); | ||||
|   | ||||
| @@ -11,15 +11,24 @@ | ||||
|  | ||||
| #include "postgres.h" | ||||
|  | ||||
| #include "access/relation.h" | ||||
| #include "access/table.h" | ||||
| #include "catalog/binary_upgrade.h" | ||||
| #include "catalog/heap.h" | ||||
| #include "catalog/namespace.h" | ||||
| #include "catalog/pg_subscription_rel.h" | ||||
| #include "catalog/pg_type.h" | ||||
| #include "commands/extension.h" | ||||
| #include "miscadmin.h" | ||||
| #include "replication/logical.h" | ||||
| #include "replication/origin.h" | ||||
| #include "replication/worker_internal.h" | ||||
| #include "storage/lmgr.h" | ||||
| #include "utils/array.h" | ||||
| #include "utils/builtins.h" | ||||
| #include "utils/lsyscache.h" | ||||
| #include "utils/pg_lsn.h" | ||||
| #include "utils/syscache.h" | ||||
|  | ||||
|  | ||||
| #define CHECK_IS_BINARY_UPGRADE									\ | ||||
| @@ -305,3 +314,100 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) | ||||
|  | ||||
| 	PG_RETURN_BOOL(!found_pending_wal); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * binary_upgrade_add_sub_rel_state | ||||
|  * | ||||
|  * Add the relation with the specified relation state to pg_subscription_rel | ||||
|  * catalog. | ||||
|  */ | ||||
| Datum | ||||
| binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	Relation	subrel; | ||||
| 	Relation	rel; | ||||
| 	Oid			subid; | ||||
| 	char	   *subname; | ||||
| 	Oid			relid; | ||||
| 	char		relstate; | ||||
| 	XLogRecPtr	sublsn; | ||||
|  | ||||
| 	CHECK_IS_BINARY_UPGRADE; | ||||
|  | ||||
| 	/* We must check these things before dereferencing the arguments */ | ||||
| 	if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) | ||||
| 		elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed"); | ||||
|  | ||||
| 	subname = text_to_cstring(PG_GETARG_TEXT_PP(0)); | ||||
| 	relid = PG_GETARG_OID(1); | ||||
| 	relstate = PG_GETARG_CHAR(2); | ||||
| 	sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3); | ||||
|  | ||||
| 	subrel = table_open(SubscriptionRelationId, RowExclusiveLock); | ||||
| 	subid = get_subscription_oid(subname, false); | ||||
| 	rel = relation_open(relid, AccessShareLock); | ||||
|  | ||||
| 	/* | ||||
| 	 * Since there are no concurrent ALTER/DROP SUBSCRIPTION commands during | ||||
| 	 * the upgrade process, and the apply worker (which builds cache based on | ||||
| 	 * the subscription catalog) is not running, the locks can be released | ||||
| 	 * immediately. | ||||
| 	 */ | ||||
| 	AddSubscriptionRelState(subid, relid, relstate, sublsn, false); | ||||
| 	relation_close(rel, AccessShareLock); | ||||
| 	table_close(subrel, RowExclusiveLock); | ||||
|  | ||||
| 	PG_RETURN_VOID(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * binary_upgrade_replorigin_advance | ||||
|  * | ||||
|  * Update the remote_lsn for the subscriber's replication origin. | ||||
|  */ | ||||
| Datum | ||||
| binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) | ||||
| { | ||||
| 	Relation	rel; | ||||
| 	Oid			subid; | ||||
| 	char	   *subname; | ||||
| 	char		originname[NAMEDATALEN]; | ||||
| 	RepOriginId node; | ||||
| 	XLogRecPtr	remote_commit; | ||||
|  | ||||
| 	CHECK_IS_BINARY_UPGRADE; | ||||
|  | ||||
| 	/* | ||||
| 	 * We must ensure a non-NULL subscription name before dereferencing the | ||||
| 	 * arguments. | ||||
| 	 */ | ||||
| 	if (PG_ARGISNULL(0)) | ||||
| 		elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed"); | ||||
|  | ||||
| 	subname = text_to_cstring(PG_GETARG_TEXT_PP(0)); | ||||
| 	remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1); | ||||
|  | ||||
| 	rel = table_open(SubscriptionRelationId, RowExclusiveLock); | ||||
| 	subid = get_subscription_oid(subname, false); | ||||
|  | ||||
| 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); | ||||
|  | ||||
| 	/* Lock to prevent the replication origin from vanishing */ | ||||
| 	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); | ||||
| 	node = replorigin_by_name(originname, false); | ||||
|  | ||||
| 	/* | ||||
| 	 * The server will be stopped after setting up the objects in the new | ||||
| 	 * cluster and the origins will be flushed during the shutdown checkpoint. | ||||
| 	 * This will ensure that the latest LSN values for origin will be | ||||
| 	 * available after the upgrade. | ||||
| 	 */ | ||||
| 	replorigin_advance(node, remote_commit, InvalidXLogRecPtr, | ||||
| 					   false /* backward */ , | ||||
| 					   false /* WAL log */ ); | ||||
|  | ||||
| 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); | ||||
| 	table_close(rel, RowExclusiveLock); | ||||
|  | ||||
| 	PG_RETURN_VOID(); | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ | ||||
| #include "catalog/pg_operator_d.h" | ||||
| #include "catalog/pg_proc_d.h" | ||||
| #include "catalog/pg_publication_d.h" | ||||
| #include "catalog/pg_subscription_d.h" | ||||
| #include "catalog/pg_type_d.h" | ||||
| #include "common/hashfn.h" | ||||
| #include "fe_utils/string_utils.h" | ||||
| @@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) | ||||
| 	pg_log_info("reading subscriptions"); | ||||
| 	getSubscriptions(fout); | ||||
|  | ||||
| 	pg_log_info("reading subscription membership of tables"); | ||||
| 	getSubscriptionTables(fout); | ||||
|  | ||||
| 	free(inhinfo);				/* not needed any longer */ | ||||
|  | ||||
| 	*numTablesPtr = numTables; | ||||
| @@ -978,6 +982,24 @@ findPublicationByOid(Oid oid) | ||||
| 	return (PublicationInfo *) dobj; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * findSubscriptionByOid | ||||
|  *	  finds the DumpableObject for the subscription with the given oid | ||||
|  *	  returns NULL if not found | ||||
|  */ | ||||
| SubscriptionInfo * | ||||
| findSubscriptionByOid(Oid oid) | ||||
| { | ||||
| 	CatalogId	catId; | ||||
| 	DumpableObject *dobj; | ||||
|  | ||||
| 	catId.tableoid = SubscriptionRelationId; | ||||
| 	catId.oid = oid; | ||||
| 	dobj = findObjectByCatalogId(catId); | ||||
| 	Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION); | ||||
| 	return (SubscriptionInfo *) dobj; | ||||
| } | ||||
|  | ||||
|  | ||||
| /* | ||||
|  * recordExtensionMembership | ||||
|   | ||||
| @@ -297,6 +297,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo); | ||||
| static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo); | ||||
| static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo); | ||||
| static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo); | ||||
| static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo); | ||||
| static void dumpDatabase(Archive *fout); | ||||
| static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf, | ||||
| 							   const char *dbname, Oid dboid); | ||||
| @@ -4638,6 +4639,8 @@ getSubscriptions(Archive *fout) | ||||
| 	int			i_subsynccommit; | ||||
| 	int			i_subpublications; | ||||
| 	int			i_suborigin; | ||||
| 	int			i_suboriginremotelsn; | ||||
| 	int			i_subenabled; | ||||
| 	int			i, | ||||
| 				ntups; | ||||
|  | ||||
| @@ -4693,16 +4696,30 @@ getSubscriptions(Archive *fout) | ||||
| 		appendPQExpBufferStr(query, | ||||
| 							 " s.subpasswordrequired,\n" | ||||
| 							 " s.subrunasowner,\n" | ||||
| 							 " s.suborigin\n"); | ||||
| 							 " s.suborigin,\n"); | ||||
| 	else | ||||
| 		appendPQExpBuffer(query, | ||||
| 						  " 't' AS subpasswordrequired,\n" | ||||
| 						  " 't' AS subrunasowner,\n" | ||||
| 						  " '%s' AS suborigin\n", | ||||
| 						  " '%s' AS suborigin,\n", | ||||
| 						  LOGICALREP_ORIGIN_ANY); | ||||
|  | ||||
| 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000) | ||||
| 		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n" | ||||
| 							 " s.subenabled\n"); | ||||
| 	else | ||||
| 		appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n" | ||||
| 							 " false AS subenabled\n"); | ||||
|  | ||||
| 	appendPQExpBufferStr(query, | ||||
| 						 "FROM pg_subscription s\n"); | ||||
|  | ||||
| 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000) | ||||
| 		appendPQExpBufferStr(query, | ||||
| 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" | ||||
| 							 "    ON o.external_id = 'pg_' || s.oid::text \n"); | ||||
|  | ||||
| 	appendPQExpBufferStr(query, | ||||
| 						 "FROM pg_subscription s\n" | ||||
| 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n" | ||||
| 						 "                   WHERE datname = current_database())"); | ||||
|  | ||||
| @@ -4729,6 +4746,8 @@ getSubscriptions(Archive *fout) | ||||
| 	i_subsynccommit = PQfnumber(res, "subsynccommit"); | ||||
| 	i_subpublications = PQfnumber(res, "subpublications"); | ||||
| 	i_suborigin = PQfnumber(res, "suborigin"); | ||||
| 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); | ||||
| 	i_subenabled = PQfnumber(res, "subenabled"); | ||||
|  | ||||
| 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); | ||||
|  | ||||
| @@ -4766,6 +4785,13 @@ getSubscriptions(Archive *fout) | ||||
| 		subinfo[i].subpublications = | ||||
| 			pg_strdup(PQgetvalue(res, i, i_subpublications)); | ||||
| 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); | ||||
| 		if (PQgetisnull(res, i, i_suboriginremotelsn)) | ||||
| 			subinfo[i].suboriginremotelsn = NULL; | ||||
| 		else | ||||
| 			subinfo[i].suboriginremotelsn = | ||||
| 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); | ||||
| 		subinfo[i].subenabled = | ||||
| 			pg_strdup(PQgetvalue(res, i, i_subenabled)); | ||||
|  | ||||
| 		/* Decide whether we want to dump it */ | ||||
| 		selectDumpableObject(&(subinfo[i].dobj), fout); | ||||
| @@ -4775,6 +4801,162 @@ getSubscriptions(Archive *fout) | ||||
| 	destroyPQExpBuffer(query); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * getSubscriptionTables | ||||
|  *	  Get information about subscription membership for dumpable tables. This | ||||
|  *    will be used only in binary-upgrade mode for PG17 or later versions. | ||||
|  */ | ||||
| void | ||||
| getSubscriptionTables(Archive *fout) | ||||
| { | ||||
| 	DumpOptions *dopt = fout->dopt; | ||||
| 	SubscriptionInfo *subinfo = NULL; | ||||
| 	SubRelInfo *subrinfo; | ||||
| 	PGresult   *res; | ||||
| 	int			i_srsubid; | ||||
| 	int			i_srrelid; | ||||
| 	int			i_srsubstate; | ||||
| 	int			i_srsublsn; | ||||
| 	int			ntups; | ||||
| 	Oid			last_srsubid = InvalidOid; | ||||
|  | ||||
| 	if (dopt->no_subscriptions || !dopt->binary_upgrade || | ||||
| 		fout->remoteVersion < 170000) | ||||
| 		return; | ||||
|  | ||||
| 	res = ExecuteSqlQuery(fout, | ||||
| 						  "SELECT srsubid, srrelid, srsubstate, srsublsn " | ||||
| 						  "FROM pg_catalog.pg_subscription_rel " | ||||
| 						  "ORDER BY srsubid", | ||||
| 						  PGRES_TUPLES_OK); | ||||
| 	ntups = PQntuples(res); | ||||
| 	if (ntups == 0) | ||||
| 		goto cleanup; | ||||
|  | ||||
| 	/* Get pg_subscription_rel attributes */ | ||||
| 	i_srsubid = PQfnumber(res, "srsubid"); | ||||
| 	i_srrelid = PQfnumber(res, "srrelid"); | ||||
| 	i_srsubstate = PQfnumber(res, "srsubstate"); | ||||
| 	i_srsublsn = PQfnumber(res, "srsublsn"); | ||||
|  | ||||
| 	subrinfo = pg_malloc(ntups * sizeof(SubRelInfo)); | ||||
| 	for (int i = 0; i < ntups; i++) | ||||
| 	{ | ||||
| 		Oid			cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid)); | ||||
| 		Oid			relid = atooid(PQgetvalue(res, i, i_srrelid)); | ||||
| 		TableInfo  *tblinfo; | ||||
|  | ||||
| 		/* | ||||
| 		 * If we switched to a new subscription, check if the subscription | ||||
| 		 * exists. | ||||
| 		 */ | ||||
| 		if (cur_srsubid != last_srsubid) | ||||
| 		{ | ||||
| 			subinfo = findSubscriptionByOid(cur_srsubid); | ||||
| 			if (subinfo == NULL) | ||||
| 				pg_fatal("subscription with OID %u does not exist", cur_srsubid); | ||||
|  | ||||
| 			last_srsubid = cur_srsubid; | ||||
| 		} | ||||
|  | ||||
| 		tblinfo = findTableByOid(relid); | ||||
| 		if (tblinfo == NULL) | ||||
| 			pg_fatal("failed sanity check, table with OID %u not found", | ||||
| 					 relid); | ||||
|  | ||||
| 		/* OK, make a DumpableObject for this relationship */ | ||||
| 		subrinfo[i].dobj.objType = DO_SUBSCRIPTION_REL; | ||||
| 		subrinfo[i].dobj.catId.tableoid = relid; | ||||
| 		subrinfo[i].dobj.catId.oid = cur_srsubid; | ||||
| 		AssignDumpId(&subrinfo[i].dobj); | ||||
| 		subrinfo[i].dobj.name = pg_strdup(subinfo->dobj.name); | ||||
| 		subrinfo[i].tblinfo = tblinfo; | ||||
| 		subrinfo[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0]; | ||||
| 		if (PQgetisnull(res, i, i_srsublsn)) | ||||
| 			subrinfo[i].srsublsn = NULL; | ||||
| 		else | ||||
| 			subrinfo[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn)); | ||||
|  | ||||
| 		subrinfo[i].subinfo = subinfo; | ||||
|  | ||||
| 		/* Decide whether we want to dump it */ | ||||
| 		selectDumpableObject(&(subrinfo[i].dobj), fout); | ||||
| 	} | ||||
|  | ||||
| cleanup: | ||||
| 	PQclear(res); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * dumpSubscriptionTable | ||||
|  *	  Dump the definition of the given subscription table mapping. This will be | ||||
|  *    used only in binary-upgrade mode for PG17 or later versions. | ||||
|  */ | ||||
| static void | ||||
| dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo) | ||||
| { | ||||
| 	DumpOptions *dopt = fout->dopt; | ||||
| 	SubscriptionInfo *subinfo = subrinfo->subinfo; | ||||
| 	PQExpBuffer query; | ||||
| 	char	   *tag; | ||||
|  | ||||
| 	/* Do nothing in data-only dump */ | ||||
| 	if (dopt->dataOnly) | ||||
| 		return; | ||||
|  | ||||
| 	Assert(fout->dopt->binary_upgrade && fout->remoteVersion >= 170000); | ||||
|  | ||||
| 	tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name); | ||||
|  | ||||
| 	query = createPQExpBuffer(); | ||||
|  | ||||
| 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * binary_upgrade_add_sub_rel_state will add the subscription relation | ||||
| 		 * to pg_subscription_rel table. This will be used only in | ||||
| 		 * binary-upgrade mode. | ||||
| 		 */ | ||||
| 		appendPQExpBufferStr(query, | ||||
| 							 "\n-- For binary upgrade, must preserve the subscriber table.\n"); | ||||
| 		appendPQExpBufferStr(query, | ||||
| 							 "SELECT pg_catalog.binary_upgrade_add_sub_rel_state("); | ||||
| 		appendStringLiteralAH(query, subrinfo->dobj.name, fout); | ||||
| 		appendPQExpBuffer(query, | ||||
| 						  ", %u, '%c'", | ||||
| 						  subrinfo->tblinfo->dobj.catId.oid, | ||||
| 						  subrinfo->srsubstate); | ||||
|  | ||||
| 		if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0') | ||||
| 			appendPQExpBuffer(query, ", '%s'", subrinfo->srsublsn); | ||||
| 		else | ||||
| 			appendPQExpBuffer(query, ", NULL"); | ||||
|  | ||||
| 		appendPQExpBufferStr(query, ");\n"); | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * There is no point in creating a drop query as the drop is done by table | ||||
| 	 * drop.  (If you think to change this, see also _printTocEntry().) | ||||
| 	 * Although this object doesn't really have ownership as such, set the | ||||
| 	 * owner field anyway to ensure that the command is run by the correct | ||||
| 	 * role at restore time. | ||||
| 	 */ | ||||
| 	if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) | ||||
| 		ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId, | ||||
| 					 ARCHIVE_OPTS(.tag = tag, | ||||
| 								  .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name, | ||||
| 								  .owner = subinfo->rolname, | ||||
| 								  .description = "SUBSCRIPTION TABLE", | ||||
| 								  .section = SECTION_POST_DATA, | ||||
| 								  .createStmt = query->data)); | ||||
|  | ||||
| 	/* These objects can't currently have comments or seclabels */ | ||||
|  | ||||
| 	free(tag); | ||||
| 	destroyPQExpBuffer(query); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * dumpSubscription | ||||
|  *	  dump the definition of the given subscription | ||||
| @@ -4855,6 +5037,43 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) | ||||
|  | ||||
| 	appendPQExpBufferStr(query, ");\n"); | ||||
|  | ||||
| 	/* | ||||
| 	 * In binary-upgrade mode, we allow the replication to continue after the | ||||
| 	 * upgrade. | ||||
| 	 */ | ||||
| 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000) | ||||
| 	{ | ||||
| 		if (subinfo->suboriginremotelsn) | ||||
| 		{ | ||||
| 			/* | ||||
| 			 * Preserve the remote_lsn for the subscriber's replication | ||||
| 			 * origin. This value is required to start the replication from | ||||
| 			 * the position before the upgrade. This value will be stale if | ||||
| 			 * the publisher gets upgraded before the subscriber node. | ||||
| 			 * However, this shouldn't be a problem as the upgrade of the | ||||
| 			 * publisher ensures that all the transactions were replicated | ||||
| 			 * before upgrading it. | ||||
| 			 */ | ||||
| 			appendPQExpBufferStr(query, | ||||
| 								 "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n"); | ||||
| 			appendPQExpBufferStr(query, | ||||
| 								 "SELECT pg_catalog.binary_upgrade_replorigin_advance("); | ||||
| 			appendStringLiteralAH(query, subinfo->dobj.name, fout); | ||||
| 			appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn); | ||||
| 		} | ||||
|  | ||||
| 		if (strcmp(subinfo->subenabled, "t") == 0) | ||||
| 		{ | ||||
| 			/* | ||||
| 			 * Enable the subscription to allow the replication to continue | ||||
| 			 * after the upgrade. | ||||
| 			 */ | ||||
| 			appendPQExpBufferStr(query, | ||||
| 								 "\n-- For binary upgrade, must preserve the subscriber's running state.\n"); | ||||
| 			appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) | ||||
| 		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, | ||||
| 					 ARCHIVE_OPTS(.tag = subinfo->dobj.name, | ||||
| @@ -10477,6 +10696,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) | ||||
| 		case DO_SUBSCRIPTION: | ||||
| 			dumpSubscription(fout, (const SubscriptionInfo *) dobj); | ||||
| 			break; | ||||
| 		case DO_SUBSCRIPTION_REL: | ||||
| 			dumpSubscriptionTable(fout, (const SubRelInfo *) dobj); | ||||
| 			break; | ||||
| 		case DO_PRE_DATA_BOUNDARY: | ||||
| 		case DO_POST_DATA_BOUNDARY: | ||||
| 			/* never dumped, nothing to do */ | ||||
| @@ -18543,6 +18765,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, | ||||
| 			case DO_PUBLICATION_REL: | ||||
| 			case DO_PUBLICATION_TABLE_IN_SCHEMA: | ||||
| 			case DO_SUBSCRIPTION: | ||||
| 			case DO_SUBSCRIPTION_REL: | ||||
| 				/* Post-data objects: must come after the post-data boundary */ | ||||
| 				addObjectDependency(dobj, postDataBound->dumpId); | ||||
| 				break; | ||||
|   | ||||
| @@ -83,6 +83,7 @@ typedef enum | ||||
| 	DO_PUBLICATION_REL, | ||||
| 	DO_PUBLICATION_TABLE_IN_SCHEMA, | ||||
| 	DO_SUBSCRIPTION, | ||||
| 	DO_SUBSCRIPTION_REL,		/* see note for SubRelInfo */ | ||||
| } DumpableObjectType; | ||||
|  | ||||
| /* | ||||
| @@ -661,6 +662,7 @@ typedef struct _SubscriptionInfo | ||||
| { | ||||
| 	DumpableObject dobj; | ||||
| 	const char *rolname; | ||||
| 	char	   *subenabled; | ||||
| 	char	   *subbinary; | ||||
| 	char	   *substream; | ||||
| 	char	   *subtwophasestate; | ||||
| @@ -672,8 +674,28 @@ typedef struct _SubscriptionInfo | ||||
| 	char	   *subsynccommit; | ||||
| 	char	   *subpublications; | ||||
| 	char	   *suborigin; | ||||
| 	char	   *suboriginremotelsn; | ||||
| } SubscriptionInfo; | ||||
|  | ||||
| /* | ||||
|  * The SubRelInfo struct is used to represent a subscription relation. | ||||
|  * | ||||
|  * XXX Currently, the subscription tables are added to the subscription after | ||||
|  * enabling the subscription in binary-upgrade mode. As the apply workers will | ||||
|  * not be started in binary_upgrade mode the ordering of enable subscription | ||||
|  * does not matter. The order of adding the subscription tables to the | ||||
|  * subscription and enabling the subscription should be taken care of if this | ||||
|  * feature will be supported in a non-binary-upgrade mode in the future. | ||||
|  */ | ||||
| typedef struct _SubRelInfo | ||||
| { | ||||
| 	DumpableObject dobj; | ||||
| 	SubscriptionInfo *subinfo; | ||||
| 	TableInfo  *tblinfo; | ||||
| 	char		srsubstate; | ||||
| 	char	   *srsublsn; | ||||
| } SubRelInfo; | ||||
|  | ||||
| /* | ||||
|  *	common utility functions | ||||
|  */ | ||||
| @@ -698,6 +720,7 @@ extern CollInfo *findCollationByOid(Oid oid); | ||||
| extern NamespaceInfo *findNamespaceByOid(Oid oid); | ||||
| extern ExtensionInfo *findExtensionByOid(Oid oid); | ||||
| extern PublicationInfo *findPublicationByOid(Oid oid); | ||||
| extern SubscriptionInfo *findSubscriptionByOid(Oid oid); | ||||
|  | ||||
| extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext); | ||||
| extern ExtensionInfo *findOwningExtension(CatalogId catalogId); | ||||
| @@ -757,5 +780,6 @@ extern void getPublicationNamespaces(Archive *fout); | ||||
| extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], | ||||
| 								 int numTables); | ||||
| extern void getSubscriptions(Archive *fout); | ||||
| extern void getSubscriptionTables(Archive *fout); | ||||
|  | ||||
| #endif							/* PG_DUMP_H */ | ||||
|   | ||||
| @@ -94,6 +94,7 @@ enum dbObjectTypePriorities | ||||
| 	PRIO_PUBLICATION_REL, | ||||
| 	PRIO_PUBLICATION_TABLE_IN_SCHEMA, | ||||
| 	PRIO_SUBSCRIPTION, | ||||
| 	PRIO_SUBSCRIPTION_REL, | ||||
| 	PRIO_DEFAULT_ACL,			/* done in ACL pass */ | ||||
| 	PRIO_EVENT_TRIGGER,			/* must be next to last! */ | ||||
| 	PRIO_REFRESH_MATVIEW		/* must be last! */ | ||||
| @@ -147,10 +148,11 @@ static const int dbObjectTypePriority[] = | ||||
| 	PRIO_PUBLICATION,			/* DO_PUBLICATION */ | ||||
| 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */ | ||||
| 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */ | ||||
| 	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */ | ||||
| 	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */ | ||||
| 	PRIO_SUBSCRIPTION_REL		/* DO_SUBSCRIPTION_REL */ | ||||
| }; | ||||
|  | ||||
| StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1), | ||||
| StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION_REL + 1), | ||||
| 				 "array length mismatch"); | ||||
|  | ||||
| static DumpId preDataBoundId; | ||||
| @@ -1472,6 +1474,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) | ||||
| 					 "SUBSCRIPTION (ID %d OID %u)", | ||||
| 					 obj->dumpId, obj->catId.oid); | ||||
| 			return; | ||||
| 		case DO_SUBSCRIPTION_REL: | ||||
| 			snprintf(buf, bufsize, | ||||
| 					 "SUBSCRIPTION TABLE (ID %d OID %u)", | ||||
| 					 obj->dumpId, obj->catId.oid); | ||||
| 			return; | ||||
| 		case DO_PRE_DATA_BOUNDARY: | ||||
| 			snprintf(buf, bufsize, | ||||
| 					 "PRE-DATA BOUNDARY  (ID %d)", | ||||
|   | ||||
| @@ -34,7 +34,9 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster); | ||||
| static void check_for_new_tablespace_dir(void); | ||||
| static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); | ||||
| static void check_new_cluster_logical_replication_slots(void); | ||||
| static void check_new_cluster_subscription_configuration(void); | ||||
| static void check_old_cluster_for_valid_slots(bool live_check); | ||||
| static void check_old_cluster_subscription_state(void); | ||||
|  | ||||
|  | ||||
| /* | ||||
| @@ -112,13 +114,21 @@ check_and_dump_old_cluster(bool live_check) | ||||
| 	check_for_reg_data_type_usage(&old_cluster); | ||||
| 	check_for_isn_and_int8_passing_mismatch(&old_cluster); | ||||
|  | ||||
| 	/* | ||||
| 	 * Logical replication slots can be migrated since PG17. See comments atop | ||||
| 	 * get_old_cluster_logical_slot_infos(). | ||||
| 	 */ | ||||
| 	if (GET_MAJOR_VERSION(old_cluster.major_version) >= 1700) | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * Logical replication slots can be migrated since PG17. See comments | ||||
| 		 * atop get_old_cluster_logical_slot_infos(). | ||||
| 		 */ | ||||
| 		check_old_cluster_for_valid_slots(live_check); | ||||
|  | ||||
| 		/* | ||||
| 		 * Subscriptions and their dependencies can be migrated since PG17. | ||||
| 		 * See comments atop get_db_subscription_count(). | ||||
| 		 */ | ||||
| 		check_old_cluster_subscription_state(); | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * PG 16 increased the size of the 'aclitem' type, which breaks the | ||||
| 	 * on-disk format for existing data. | ||||
| @@ -237,6 +247,8 @@ check_new_cluster(void) | ||||
| 	check_for_new_tablespace_dir(); | ||||
|  | ||||
| 	check_new_cluster_logical_replication_slots(); | ||||
|  | ||||
| 	check_new_cluster_subscription_configuration(); | ||||
| } | ||||
|  | ||||
|  | ||||
| @@ -1538,6 +1550,53 @@ check_new_cluster_logical_replication_slots(void) | ||||
| 	check_ok(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * check_new_cluster_subscription_configuration() | ||||
|  * | ||||
|  * Verify that the max_replication_slots configuration specified is enough for | ||||
|  * creating the subscriptions. This is required to create the replication | ||||
|  * origin for each subscription. | ||||
|  */ | ||||
| static void | ||||
| check_new_cluster_subscription_configuration(void) | ||||
| { | ||||
| 	PGresult   *res; | ||||
| 	PGconn	   *conn; | ||||
| 	int			nsubs_on_old; | ||||
| 	int			max_replication_slots; | ||||
|  | ||||
| 	/* Subscriptions and their dependencies can be migrated since PG17. */ | ||||
| 	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) | ||||
| 		return; | ||||
|  | ||||
| 	nsubs_on_old = count_old_cluster_subscriptions(); | ||||
|  | ||||
| 	/* Quick return if there are no subscriptions to be migrated. */ | ||||
| 	if (nsubs_on_old == 0) | ||||
| 		return; | ||||
|  | ||||
| 	prep_status("Checking for new cluster configuration for subscriptions"); | ||||
|  | ||||
| 	conn = connectToServer(&new_cluster, "template1"); | ||||
|  | ||||
| 	res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings " | ||||
| 							"WHERE name = 'max_replication_slots';"); | ||||
|  | ||||
| 	if (PQntuples(res) != 1) | ||||
| 		pg_fatal("could not determine parameter settings on new cluster"); | ||||
|  | ||||
| 	max_replication_slots = atoi(PQgetvalue(res, 0, 0)); | ||||
| 	if (nsubs_on_old > max_replication_slots) | ||||
| 		pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of " | ||||
| 				 "subscriptions (%d) on the old cluster", | ||||
| 				 max_replication_slots, nsubs_on_old); | ||||
|  | ||||
| 	PQclear(res); | ||||
| 	PQfinish(conn); | ||||
|  | ||||
| 	check_ok(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * check_old_cluster_for_valid_slots() | ||||
|  * | ||||
| @@ -1613,3 +1672,129 @@ check_old_cluster_for_valid_slots(bool live_check) | ||||
|  | ||||
| 	check_ok(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * check_old_cluster_subscription_state() | ||||
|  * | ||||
|  * Verify that the replication origin corresponding to each of the | ||||
|  * subscriptions are present and each of the subscribed tables is in | ||||
|  * 'i' (initialize) or 'r' (ready) state. | ||||
|  */ | ||||
| static void | ||||
| check_old_cluster_subscription_state(void) | ||||
| { | ||||
| 	FILE	   *script = NULL; | ||||
| 	char		output_path[MAXPGPATH]; | ||||
| 	int			ntup; | ||||
|  | ||||
| 	prep_status("Checking for subscription state"); | ||||
|  | ||||
| 	snprintf(output_path, sizeof(output_path), "%s/%s", | ||||
| 			 log_opts.basedir, | ||||
| 			 "subs_invalid.txt"); | ||||
| 	for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) | ||||
| 	{ | ||||
| 		PGresult   *res; | ||||
| 		DbInfo	   *active_db = &old_cluster.dbarr.dbs[dbnum]; | ||||
| 		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name); | ||||
|  | ||||
| 		/* We need to check for pg_replication_origin only once. */ | ||||
| 		if (dbnum == 0) | ||||
| 		{ | ||||
| 			/* | ||||
| 			 * Check that all the subscriptions have their respective | ||||
| 			 * replication origin. | ||||
| 			 */ | ||||
| 			res = executeQueryOrDie(conn, | ||||
| 									"SELECT d.datname, s.subname " | ||||
| 									"FROM pg_catalog.pg_subscription s " | ||||
| 									"LEFT OUTER JOIN pg_catalog.pg_replication_origin o " | ||||
| 									"	ON o.roname = 'pg_' || s.oid " | ||||
| 									"INNER JOIN pg_catalog.pg_database d " | ||||
| 									"	ON d.oid = s.subdbid " | ||||
| 									"WHERE o.roname iS NULL;"); | ||||
|  | ||||
| 			ntup = PQntuples(res); | ||||
| 			for (int i = 0; i < ntup; i++) | ||||
| 			{ | ||||
| 				if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) | ||||
| 					pg_fatal("could not open file \"%s\": %s", | ||||
| 							 output_path, strerror(errno)); | ||||
| 				fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", | ||||
| 						PQgetvalue(res, i, 0), | ||||
| 						PQgetvalue(res, i, 1)); | ||||
| 			} | ||||
| 			PQclear(res); | ||||
| 		} | ||||
|  | ||||
| 		/* | ||||
| 		 * We don't allow upgrade if there is a risk of dangling slot or | ||||
| 		 * origin corresponding to initial sync after upgrade. | ||||
| 		 * | ||||
| 		 * A slot/origin not created yet refers to the 'i' (initialize) state, | ||||
| 		 * while 'r' (ready) state refers to a slot/origin created previously | ||||
| 		 * but already dropped. These states are supported for pg_upgrade. The | ||||
| 		 * other states listed below are not supported: | ||||
| 		 * | ||||
| 		 * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state | ||||
| 		 * would retain a replication slot, which could not be dropped by the | ||||
| 		 * sync worker spawned after the upgrade because the subscription ID | ||||
| 		 * used for the slot name won't match anymore. | ||||
| 		 * | ||||
| 		 * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state | ||||
| 		 * would retain the replication origin when there is a failure in | ||||
| 		 * tablesync worker immediately after dropping the replication slot in | ||||
| 		 * the publisher. | ||||
| 		 * | ||||
| 		 * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on | ||||
| 		 * a relation upgraded while in this state would expect an origin ID | ||||
| 		 * with the OID of the subscription used before the upgrade, causing | ||||
| 		 * it to fail. | ||||
| 		 * | ||||
| 		 * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and | ||||
| 		 * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, | ||||
| 		 * so we need not allow these states. | ||||
| 		 */ | ||||
| 		res = executeQueryOrDie(conn, | ||||
| 								"SELECT r.srsubstate, s.subname, n.nspname, c.relname " | ||||
| 								"FROM pg_catalog.pg_subscription_rel r " | ||||
| 								"LEFT JOIN pg_catalog.pg_subscription s" | ||||
| 								"	ON r.srsubid = s.oid " | ||||
| 								"LEFT JOIN pg_catalog.pg_class c" | ||||
| 								"	ON r.srrelid = c.oid " | ||||
| 								"LEFT JOIN pg_catalog.pg_namespace n" | ||||
| 								"	ON c.relnamespace = n.oid " | ||||
| 								"WHERE r.srsubstate NOT IN ('i', 'r') " | ||||
| 								"ORDER BY s.subname"); | ||||
|  | ||||
| 		ntup = PQntuples(res); | ||||
| 		for (int i = 0; i < ntup; i++) | ||||
| 		{ | ||||
| 			if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) | ||||
| 				pg_fatal("could not open file \"%s\": %s", | ||||
| 						 output_path, strerror(errno)); | ||||
|  | ||||
| 			fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", | ||||
| 					PQgetvalue(res, i, 0), | ||||
| 					active_db->db_name, | ||||
| 					PQgetvalue(res, i, 1), | ||||
| 					PQgetvalue(res, i, 2), | ||||
| 					PQgetvalue(res, i, 3)); | ||||
| 		} | ||||
|  | ||||
| 		PQclear(res); | ||||
| 		PQfinish(conn); | ||||
| 	} | ||||
|  | ||||
| 	if (script) | ||||
| 	{ | ||||
| 		fclose(script); | ||||
| 		pg_log(PG_REPORT, "fatal"); | ||||
| 		pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" | ||||
| 				 "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" | ||||
| 				 "A list of the problematic subscriptions is in the file:\n" | ||||
| 				 "    %s", output_path); | ||||
| 	} | ||||
| 	else | ||||
| 		check_ok(); | ||||
| } | ||||
|   | ||||
| @@ -28,6 +28,7 @@ static void print_db_infos(DbInfoArr *db_arr); | ||||
| static void print_rel_infos(RelInfoArr *rel_arr); | ||||
| static void print_slot_infos(LogicalSlotInfoArr *slot_arr); | ||||
| static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check); | ||||
| static void get_db_subscription_count(DbInfo *dbinfo); | ||||
|  | ||||
|  | ||||
| /* | ||||
| @@ -293,10 +294,14 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) | ||||
| 		get_rel_infos(cluster, pDbInfo); | ||||
|  | ||||
| 		/* | ||||
| 		 * Retrieve the logical replication slots infos for the old cluster. | ||||
| 		 * Retrieve the logical replication slots infos and the subscriptions | ||||
| 		 * count for the old cluster. | ||||
| 		 */ | ||||
| 		if (cluster == &old_cluster) | ||||
| 		{ | ||||
| 			get_old_cluster_logical_slot_infos(pDbInfo, live_check); | ||||
| 			get_db_subscription_count(pDbInfo); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if (cluster == &old_cluster) | ||||
| @@ -730,6 +735,55 @@ count_old_cluster_logical_slots(void) | ||||
| 	return slot_count; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * get_db_subscription_count() | ||||
|  * | ||||
|  * Gets the number of subscriptions in the database referred to by "dbinfo". | ||||
|  * | ||||
|  * Note: This function will not do anything if the old cluster is pre-PG17. | ||||
|  * This is because before that the logical slots are not upgraded, so we will | ||||
|  * not be able to upgrade the logical replication clusters completely. | ||||
|  */ | ||||
| static void | ||||
| get_db_subscription_count(DbInfo *dbinfo) | ||||
| { | ||||
| 	PGconn	   *conn; | ||||
| 	PGresult   *res; | ||||
|  | ||||
| 	/* Subscriptions can be migrated since PG17. */ | ||||
| 	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) | ||||
| 		return; | ||||
|  | ||||
| 	conn = connectToServer(&old_cluster, dbinfo->db_name); | ||||
| 	res = executeQueryOrDie(conn, "SELECT count(*) " | ||||
| 							"FROM pg_catalog.pg_subscription WHERE subdbid = %d", | ||||
| 							dbinfo->db_oid); | ||||
| 	dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0)); | ||||
|  | ||||
| 	PQclear(res); | ||||
| 	PQfinish(conn); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * count_old_cluster_subscriptions() | ||||
|  * | ||||
|  * Returns the number of subscriptions for all databases. | ||||
|  * | ||||
|  * Note: this function always returns 0 if the old_cluster is PG16 and prior | ||||
|  * because we gather subscriptions only for cluster versions greater than or | ||||
|  * equal to PG17. See get_db_subscription_count(). | ||||
|  */ | ||||
| int | ||||
| count_old_cluster_subscriptions(void) | ||||
| { | ||||
| 	int			nsubs = 0; | ||||
|  | ||||
| 	for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) | ||||
| 		nsubs += old_cluster.dbarr.dbs[dbnum].nsubs; | ||||
|  | ||||
| 	return nsubs; | ||||
| } | ||||
|  | ||||
| static void | ||||
| free_db_and_rel_infos(DbInfoArr *db_arr) | ||||
| { | ||||
|   | ||||
| @@ -43,6 +43,7 @@ tests += { | ||||
|       't/001_basic.pl', | ||||
|       't/002_pg_upgrade.pl', | ||||
|       't/003_logical_slots.pl', | ||||
|       't/004_subscription.pl', | ||||
|     ], | ||||
|     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow | ||||
|   }, | ||||
|   | ||||
| @@ -195,6 +195,7 @@ typedef struct | ||||
| 											 * path */ | ||||
| 	RelInfoArr	rel_arr;		/* array of all user relinfos */ | ||||
| 	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */ | ||||
| 	int			nsubs;			/* number of subscriptions */ | ||||
| } DbInfo; | ||||
|  | ||||
| /* | ||||
| @@ -421,6 +422,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, | ||||
| 							  const char *new_pgdata); | ||||
| void		get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check); | ||||
| int			count_old_cluster_logical_slots(void); | ||||
| int			count_old_cluster_subscriptions(void); | ||||
|  | ||||
| /* option.c */ | ||||
|  | ||||
|   | ||||
							
								
								
									
										319
									
								
								src/bin/pg_upgrade/t/004_subscription.pl
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										319
									
								
								src/bin/pg_upgrade/t/004_subscription.pl
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,319 @@ | ||||
| # Copyright (c) 2023, PostgreSQL Global Development Group | ||||
|  | ||||
| # Test for pg_upgrade of logical subscription | ||||
| use strict; | ||||
| use warnings; | ||||
|  | ||||
| use File::Find qw(find); | ||||
|  | ||||
| use PostgreSQL::Test::Cluster; | ||||
| use PostgreSQL::Test::Utils; | ||||
| use Test::More; | ||||
|  | ||||
| # Can be changed to test the other modes. | ||||
| my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy'; | ||||
|  | ||||
| # Initialize publisher node | ||||
| my $publisher = PostgreSQL::Test::Cluster->new('publisher'); | ||||
| $publisher->init(allows_streaming => 'logical'); | ||||
| $publisher->start; | ||||
|  | ||||
| # Initialize the old subscriber node | ||||
| my $old_sub = PostgreSQL::Test::Cluster->new('old_sub'); | ||||
| $old_sub->init; | ||||
| $old_sub->start; | ||||
| my $oldbindir = $old_sub->config_data('--bindir'); | ||||
|  | ||||
| # Initialize the new subscriber | ||||
| my $new_sub = PostgreSQL::Test::Cluster->new('new_sub'); | ||||
| $new_sub->init; | ||||
| my $newbindir = $new_sub->config_data('--bindir'); | ||||
|  | ||||
| # In a VPATH build, we'll be started in the source directory, but we want | ||||
| # to run pg_upgrade in the build directory so that any files generated finish | ||||
| # in it, like delete_old_cluster.{sh,bat}. | ||||
| chdir ${PostgreSQL::Test::Utils::tmp_check}; | ||||
|  | ||||
| # Initial setup | ||||
| $publisher->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		CREATE TABLE tab_upgraded1(id int); | ||||
| 		CREATE TABLE tab_upgraded2(id int); | ||||
| ]); | ||||
| $old_sub->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		CREATE TABLE tab_upgraded1(id int); | ||||
| 		CREATE TABLE tab_upgraded2(id int); | ||||
| ]); | ||||
|  | ||||
| # Setup logical replication | ||||
| my $connstr = $publisher->connstr . ' dbname=postgres'; | ||||
|  | ||||
| # Setup an enabled subscription to verify that the running status is retained | ||||
| # after upgrade. | ||||
| $publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); | ||||
| $old_sub->safe_psql('postgres', | ||||
| 	"CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1" | ||||
| ); | ||||
| $old_sub->wait_for_subscription_sync($publisher, 'regress_sub1'); | ||||
|  | ||||
| # Verify that the upgrade should be successful with tables in 'ready'/'init' | ||||
| # state along with retaining the replication origin's remote lsn, and | ||||
| # subscription's running status. | ||||
| $publisher->safe_psql('postgres', | ||||
| 	"CREATE PUBLICATION regress_pub2 FOR TABLE tab_upgraded1"); | ||||
| $old_sub->safe_psql('postgres', | ||||
| 	"CREATE SUBSCRIPTION regress_sub2 CONNECTION '$connstr' PUBLICATION regress_pub2" | ||||
| ); | ||||
| # Wait till the table tab_upgraded1 reaches 'ready' state | ||||
| my $synced_query = | ||||
|   "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'r'"; | ||||
| $old_sub->poll_query_until('postgres', $synced_query) | ||||
|   or die "Timed out while waiting for the table to reach ready state"; | ||||
|  | ||||
| $publisher->safe_psql('postgres', | ||||
| 	"INSERT INTO tab_upgraded1 VALUES (generate_series(1,50))"); | ||||
| $publisher->wait_for_catchup('regress_sub2'); | ||||
|  | ||||
| # Change configuration to prepare a subscription table in init state | ||||
| $old_sub->append_conf('postgresql.conf', | ||||
| 	"max_logical_replication_workers = 0"); | ||||
| $old_sub->restart; | ||||
|  | ||||
| $publisher->safe_psql('postgres', | ||||
| 	"ALTER PUBLICATION regress_pub2 ADD TABLE tab_upgraded2"); | ||||
| $old_sub->safe_psql('postgres', | ||||
| 	"ALTER SUBSCRIPTION regress_sub2 REFRESH PUBLICATION"); | ||||
|  | ||||
| # The table tab_upgraded2 will be in init state as the subscriber | ||||
| # configuration for max_logical_replication_workers is set to 0. | ||||
| my $result = $old_sub->safe_psql('postgres', | ||||
| 	"SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'i'"); | ||||
| is($result, qq(t), "Check that the table is in init state"); | ||||
|  | ||||
| # Get the replication origin's remote_lsn of the old subscriber | ||||
| my $remote_lsn = $old_sub->safe_psql('postgres', | ||||
| 	"SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub2'" | ||||
| ); | ||||
| # Have the subscription in disabled state before upgrade | ||||
| $old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 DISABLE"); | ||||
|  | ||||
| my $tab_upgraded1_oid = $old_sub->safe_psql('postgres', | ||||
| 	"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded1'"); | ||||
| my $tab_upgraded2_oid = $old_sub->safe_psql('postgres', | ||||
| 	"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded2'"); | ||||
|  | ||||
| $old_sub->stop; | ||||
|  | ||||
| # ------------------------------------------------------ | ||||
| # Check that pg_upgrade is successful when all tables are in ready or in | ||||
| # init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is | ||||
| # in init state) along with retaining the replication origin's remote lsn | ||||
| # and subscription's running status. | ||||
| # ------------------------------------------------------ | ||||
| command_ok( | ||||
| 	[ | ||||
| 		'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir, | ||||
| 		'-D', $new_sub->data_dir, '-b', $oldbindir, | ||||
| 		'-B', $newbindir, '-s', $new_sub->host, | ||||
| 		'-p', $old_sub->port, '-P', $new_sub->port, | ||||
| 		$mode | ||||
| 	], | ||||
| 	'run of pg_upgrade for old instance when the subscription tables are in init/ready state' | ||||
| ); | ||||
| ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d", | ||||
| 	"pg_upgrade_output.d/ removed after successful pg_upgrade"); | ||||
|  | ||||
| # ------------------------------------------------------ | ||||
| # Check that the data inserted to the publisher when the new subscriber is down | ||||
| # will be replicated once it is started. Also check that the old subscription | ||||
| # states and relations origins are all preserved. | ||||
| # ------------------------------------------------------ | ||||
| $publisher->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		INSERT INTO tab_upgraded1 VALUES(51); | ||||
| 		INSERT INTO tab_upgraded2 VALUES(1); | ||||
| ]); | ||||
|  | ||||
| $new_sub->start; | ||||
|  | ||||
| # The subscription's running status should be preserved. Old subscription | ||||
| # regress_sub1 should be enabled and old subscription regress_sub2 should be | ||||
| # disabled. | ||||
| $result = | ||||
|   $new_sub->safe_psql('postgres', | ||||
| 	"SELECT subname, subenabled FROM pg_subscription ORDER BY subname"); | ||||
| is( $result, qq(regress_sub1|t | ||||
| regress_sub2|f), | ||||
| 	"check that the subscription's running status are preserved"); | ||||
|  | ||||
| my $sub_oid = $new_sub->safe_psql('postgres', | ||||
| 	"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub2'"); | ||||
|  | ||||
| # Subscription relations should be preserved | ||||
| $result = $new_sub->safe_psql('postgres', | ||||
| 	"SELECT srrelid, srsubstate FROM pg_subscription_rel WHERE srsubid = $sub_oid ORDER BY srrelid" | ||||
| ); | ||||
| is( $result, qq($tab_upgraded1_oid|r | ||||
| $tab_upgraded2_oid|i), | ||||
| 	"there should be 2 rows in pg_subscription_rel(representing tab_upgraded1 and tab_upgraded2)" | ||||
| ); | ||||
|  | ||||
| # The replication origin's remote_lsn should be preserved | ||||
| $result = $new_sub->safe_psql('postgres', | ||||
| 	"SELECT remote_lsn FROM pg_replication_origin_status WHERE external_id = 'pg_' || $sub_oid" | ||||
| ); | ||||
| is($result, qq($remote_lsn), "remote_lsn should have been preserved"); | ||||
|  | ||||
| # Enable the subscription | ||||
| $new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 ENABLE"); | ||||
|  | ||||
| # Wait until all tables of subscription 'regress_sub2' are synchronized | ||||
| $new_sub->wait_for_subscription_sync($publisher, 'regress_sub2'); | ||||
|  | ||||
| # Rows on tab_upgraded1 and tab_upgraded2 should have been replicated | ||||
| $result = | ||||
|   $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded1"); | ||||
| is($result, qq(51), "check replicated inserts on new subscriber"); | ||||
| $result = | ||||
|   $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded2"); | ||||
| is($result, qq(1), | ||||
| 	"check the data is synced after enabling the subscription for the table that was in init state" | ||||
| ); | ||||
|  | ||||
| # cleanup | ||||
| $new_sub->stop; | ||||
| $old_sub->append_conf('postgresql.conf', | ||||
| 	"max_logical_replication_workers = 4"); | ||||
| $old_sub->start; | ||||
| $old_sub->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		ALTER SUBSCRIPTION regress_sub1 DISABLE; | ||||
| 		ALTER SUBSCRIPTION regress_sub1 SET (slot_name = none); | ||||
| 		DROP SUBSCRIPTION regress_sub1; | ||||
| ]); | ||||
| $old_sub->stop; | ||||
|  | ||||
| # ------------------------------------------------------ | ||||
| # Check that pg_upgrade fails when max_replication_slots configured in the new | ||||
| # cluster is less than the number of subscriptions in the old cluster. | ||||
| # ------------------------------------------------------ | ||||
| my $new_sub1 = PostgreSQL::Test::Cluster->new('new_sub1'); | ||||
| $new_sub1->init; | ||||
| $new_sub1->append_conf('postgresql.conf', "max_replication_slots = 0"); | ||||
|  | ||||
| # pg_upgrade will fail because the new cluster has insufficient | ||||
| # max_replication_slots. | ||||
| command_checks_all( | ||||
| 	[ | ||||
| 		'pg_upgrade', '--no-sync', | ||||
| 		'-d', $old_sub->data_dir, | ||||
| 		'-D', $new_sub1->data_dir, | ||||
| 		'-b', $oldbindir, | ||||
| 		'-B', $newbindir, | ||||
| 		'-s', $new_sub1->host, | ||||
| 		'-p', $old_sub->port, | ||||
| 		'-P', $new_sub1->port, | ||||
| 		$mode, '--check', | ||||
| 	], | ||||
| 	1, | ||||
| 	[ | ||||
| 		qr/max_replication_slots \(0\) must be greater than or equal to the number of subscriptions \(1\) on the old cluster/ | ||||
| 	], | ||||
| 	[qr//], | ||||
| 	'run of pg_upgrade where the new cluster has insufficient max_replication_slots' | ||||
| ); | ||||
|  | ||||
| # Reset max_replication_slots | ||||
| $new_sub1->append_conf('postgresql.conf', "max_replication_slots = 10"); | ||||
|  | ||||
| # Drop the subscription | ||||
| $old_sub->start; | ||||
| $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub2"); | ||||
|  | ||||
| # ------------------------------------------------------ | ||||
| # Check that pg_upgrade refuses to run if: | ||||
| # a) there's a subscription with tables in a state other than 'r' (ready) or | ||||
| #    'i' (init) and/or | ||||
| # b) the subscription has no replication origin. | ||||
| # ------------------------------------------------------ | ||||
| $publisher->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		CREATE TABLE tab_primary_key(id serial PRIMARY KEY); | ||||
| 		INSERT INTO tab_primary_key values(1); | ||||
| 		CREATE PUBLICATION regress_pub3 FOR TABLE tab_primary_key; | ||||
| ]); | ||||
|  | ||||
| # Insert the same value that is already present in publisher to the primary key | ||||
| # column of subscriber so that the table sync will fail. | ||||
| $old_sub->safe_psql( | ||||
| 	'postgres', qq[ | ||||
| 		CREATE TABLE tab_primary_key(id serial PRIMARY KEY); | ||||
| 		INSERT INTO tab_primary_key values(1); | ||||
| 		CREATE SUBSCRIPTION regress_sub3 CONNECTION '$connstr' PUBLICATION regress_pub3; | ||||
| ]); | ||||
|  | ||||
| # Table will be in 'd' (data is being copied) state as table sync will fail | ||||
| # because of primary key constraint error. | ||||
| my $started_query = | ||||
|   "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd'"; | ||||
| $old_sub->poll_query_until('postgres', $started_query) | ||||
|   or die | ||||
|   "Timed out while waiting for the table state to become 'd' (datasync)"; | ||||
|  | ||||
| # Create another subscription and drop the subscription's replication origin | ||||
| $old_sub->safe_psql('postgres', | ||||
| 	"CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub3 WITH (enabled = false)" | ||||
| ); | ||||
| $sub_oid = $old_sub->safe_psql('postgres', | ||||
| 	"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub4'"); | ||||
| my $reporigin = 'pg_' . qq($sub_oid); | ||||
| $old_sub->safe_psql('postgres', | ||||
| 	"SELECT pg_replication_origin_drop('$reporigin')"); | ||||
|  | ||||
| $old_sub->stop; | ||||
|  | ||||
| command_fails( | ||||
| 	[ | ||||
| 		'pg_upgrade', '--no-sync', | ||||
| 		'-d', $old_sub->data_dir, | ||||
| 		'-D', $new_sub1->data_dir, | ||||
| 		'-b', $oldbindir, | ||||
| 		'-B', $newbindir, | ||||
| 		'-s', $new_sub1->host, | ||||
| 		'-p', $old_sub->port, | ||||
| 		'-P', $new_sub1->port, | ||||
| 		$mode, '--check', | ||||
| 	], | ||||
| 	'run of pg_upgrade --check for old instance with relation in \'d\' datasync(invalid) state and missing replication origin' | ||||
| ); | ||||
|  | ||||
| # Verify the reason why the subscriber cannot be upgraded | ||||
| my $sub_relstate_filename; | ||||
|  | ||||
| # Find a txt file that contains a list of tables that cannot be upgraded. We | ||||
| # cannot predict the file's path because the output directory contains a | ||||
| # milliseconds timestamp. File::Find::find must be used. | ||||
| find( | ||||
| 	sub { | ||||
| 		if ($File::Find::name =~ m/subs_invalid\.txt/) | ||||
| 		{ | ||||
| 			$sub_relstate_filename = $File::Find::name; | ||||
| 		} | ||||
| 	}, | ||||
| 	$new_sub1->data_dir . "/pg_upgrade_output.d"); | ||||
|  | ||||
| # Check the file content which should have tab_primary_key table in invalid | ||||
| # state. | ||||
| like( | ||||
| 	slurp_file($sub_relstate_filename), | ||||
| 	qr/The table sync state \"d\" is not allowed for database:\"postgres\" subscription:\"regress_sub3\" schema:\"public\" relation:\"tab_primary_key\"/m, | ||||
| 	'the previous test failed due to subscription table in invalid state'); | ||||
|  | ||||
| # Check the file content which should have regress_sub4 subscription. | ||||
| like( | ||||
| 	slurp_file($sub_relstate_filename), | ||||
| 	qr/The replication origin is missing for database:\"postgres\" subscription:\"regress_sub4\"/m, | ||||
| 	'the previous test failed due to missing replication origin'); | ||||
|  | ||||
| done_testing(); | ||||
| @@ -57,6 +57,6 @@ | ||||
|  */ | ||||
|  | ||||
| /*							yyyymmddN */ | ||||
| #define CATALOG_VERSION_NO	202312271 | ||||
| #define CATALOG_VERSION_NO	202401021 | ||||
|  | ||||
| #endif | ||||
|   | ||||
| @@ -11410,6 +11410,16 @@ | ||||
|   proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v', | ||||
|   proparallel => 'u', prorettype => 'bool', proargtypes => 'name', | ||||
|   prosrc => 'binary_upgrade_logical_slot_has_caught_up' }, | ||||
| { oid => '8404', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)', | ||||
|   proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f', | ||||
|   provolatile => 'v', proparallel => 'u', prorettype => 'void', | ||||
|   proargtypes => 'text oid char pg_lsn', | ||||
|   prosrc => 'binary_upgrade_add_sub_rel_state' }, | ||||
| { oid => '8405', descr => 'for use by pg_upgrade (remote_lsn for origin)', | ||||
|   proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f', | ||||
|   provolatile => 'v', proparallel => 'u', prorettype => 'void', | ||||
|   proargtypes => 'text pg_lsn', | ||||
|   prosrc => 'binary_upgrade_replorigin_advance' }, | ||||
|  | ||||
| # conversion functions | ||||
| { oid => '4302', | ||||
|   | ||||
| @@ -81,7 +81,7 @@ typedef struct SubscriptionRelState | ||||
| } SubscriptionRelState; | ||||
|  | ||||
| extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, | ||||
| 									XLogRecPtr sublsn); | ||||
| 									XLogRecPtr sublsn, bool retain_lock); | ||||
| extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, | ||||
| 									   XLogRecPtr sublsn); | ||||
| extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); | ||||
|   | ||||
| @@ -2669,6 +2669,7 @@ SubLinkType | ||||
| SubOpts | ||||
| SubPlan | ||||
| SubPlanState | ||||
| SubRelInfo | ||||
| SubRemoveRels | ||||
| SubTransactionId | ||||
| SubXactCallback | ||||
|   | ||||
		Reference in New Issue
	
	Block a user