From be1cc8f46f57a04e69d9e4dd268d34da885fe6eb Mon Sep 17 00:00:00 2001
From: Simon Riggs <simon@2ndQuadrant.com>
Date: Mon, 17 Nov 2014 22:15:07 +0000
Subject: [PATCH] Add pg_dump --snapshot option

Allows pg_dump to use a snapshot previously defined by a concurrent
session that has either used pg_export_snapshot() or obtained a
snapshot when creating a logical slot. When this option is used with
parallel pg_dump, the snapshot defined by this option is used and no
new snapshot is taken.

Simon Riggs and Michael Paquier
---
 doc/src/sgml/ref/pg_dump.sgml | 21 +++++++++++++++
 src/bin/pg_dump/pg_dump.c     | 51 +++++++++++++++++++++++------------
 2 files changed, 55 insertions(+), 17 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index c92c6eef5d3..a6e7b08e2e2 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -847,6 +847,27 @@ PostgreSQL documentation
        </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>--snapshot=<replaceable class="parameter">snapshotname</replaceable></option></term>
+       <listitem>
+         <para>
+          Use the specifed synchronized snapshot when making a dump of the
+          database (see
+          <xref linkend="functions-snapshot-synchronization-table"> for more
+          details).
+         </para>
+         <para>
+          This option is useful when needing to synchronize the dump with
+          a logical replication slot (see <xref linkend="logicaldecoding">)
+          or with a concurrent session.
+         </para>
+         <para>
+          In the case of a parallel dump, the snapshot name defined by this
+          option is used rather than taking a new snapshot.
+         </para>
+       </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--serializable-deferrable</option></term>
       <listitem>
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1e8f089224c..69d359458bc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -126,7 +126,8 @@ static const CatalogId nilCatalogId = {0, 0};
 
 static void help(const char *progname);
 static void setup_connection(Archive *AH, DumpOptions *dopt,
-				 const char *dumpencoding, char *use_role);
+				const char *dumpencoding, const char *dumpsnapshot,
+				char *use_role);
 static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
 static void expand_schema_name_patterns(Archive *fout,
 							SimpleStringList *patterns,
@@ -269,6 +270,7 @@ main(int argc, char **argv)
 	RestoreOptions *ropt;
 	Archive    *fout;			/* the script file */
 	const char *dumpencoding = NULL;
+	const char *dumpsnapshot = NULL;
 	char	   *use_role = NULL;
 	int			numWorkers = 1;
 	trivalue	prompt_password = TRI_DEFAULT;
@@ -329,6 +331,7 @@ main(int argc, char **argv)
 		{"role", required_argument, NULL, 3},
 		{"section", required_argument, NULL, 5},
 		{"serializable-deferrable", no_argument, &dopt->serializable_deferrable, 1},
+		{"snapshot", required_argument, NULL, 6},
 		{"use-set-session-authorization", no_argument, &dopt->use_setsessauth, 1},
 		{"no-security-labels", no_argument, &dopt->no_security_labels, 1},
 		{"no-synchronized-snapshots", no_argument, &dopt->no_synchronized_snapshots, 1},
@@ -506,6 +509,10 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &dopt->dumpSections);
 				break;
 
+			case 6:				/* snapshot */
+				dumpsnapshot = pg_strdup(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -614,7 +621,7 @@ main(int argc, char **argv)
 	 * death.
 	 */
 	ConnectDatabase(fout, dopt->dbname, dopt->pghost, dopt->pgport, dopt->username, prompt_password);
-	setup_connection(fout, dopt, dumpencoding, use_role);
+	setup_connection(fout, dopt, dumpencoding, dumpsnapshot, use_role);
 
 	/*
 	 * Disable security label support if server version < v9.1.x (prevents
@@ -658,6 +665,11 @@ main(int argc, char **argv)
 		  "Run with --no-synchronized-snapshots instead if you do not need\n"
 					  "synchronized snapshots.\n");
 
+	/* check the version when a snapshot is explicitly specified by user */
+	if (dumpsnapshot && fout->remoteVersion < 90200)
+		exit_horribly(NULL,
+			"Exported snapshots are not supported by this server version.\n");
+
 	/* Find the last built-in OID, if needed */
 	if (fout->remoteVersion < 70300)
 	{
@@ -888,6 +900,7 @@ help(const char *progname)
 	printf(_("  --quote-all-identifiers      quote all identifiers, even if not key words\n"));
 	printf(_("  --section=SECTION            dump named section (pre-data, data, or post-data)\n"));
 	printf(_("  --serializable-deferrable    wait until the dump can run without anomalies\n"));
+	printf(_("  --snapshot=SNAPSHOT          use given synchronous snapshot for the dump\n"));
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
@@ -907,7 +920,8 @@ help(const char *progname)
 }
 
 static void
-setup_connection(Archive *AH, DumpOptions *dopt, const char *dumpencoding, char *use_role)
+setup_connection(Archive *AH, DumpOptions *dopt, const char *dumpencoding,
+				 const char *dumpsnapshot, char *use_role)
 {
 	PGconn	   *conn = GetConnection(AH);
 	const char *std_strings;
@@ -1015,22 +1029,25 @@ setup_connection(Archive *AH, DumpOptions *dopt, const char *dumpencoding, char
 		ExecuteSqlStatement(AH,
 							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
 
+	/*
+	 * define an export snapshot, either chosen by user or needed for
+	 * parallel dump.
+	 */
+	if (dumpsnapshot)
+		AH->sync_snapshot_id = strdup(dumpsnapshot);
 
-
-	if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !dopt->no_synchronized_snapshots)
+	if (AH->sync_snapshot_id)
 	{
-		if (AH->sync_snapshot_id)
-		{
-			PQExpBuffer query = createPQExpBuffer();
-
-			appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
-			appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
-			ExecuteSqlStatement(AH, query->data);
-			destroyPQExpBuffer(query);
-		}
-		else
-			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
+		PQExpBuffer query = createPQExpBuffer();
+		appendPQExpBuffer(query, "SET TRANSACTION SNAPSHOT ");
+		appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
+		ExecuteSqlStatement(AH, query->data);
+		destroyPQExpBuffer(query);
 	}
+	else if (AH->numWorkers > 1 &&
+			 AH->remoteVersion >= 90200 &&
+			 !dopt->no_synchronized_snapshots)
+		AH->sync_snapshot_id = get_synchronized_snapshot(AH);
 
 	if (AH->remoteVersion >= 90500)
 	{
@@ -1044,7 +1061,7 @@ setup_connection(Archive *AH, DumpOptions *dopt, const char *dumpencoding, char
 static void
 setupDumpWorker(Archive *AHX, DumpOptions *dopt, RestoreOptions *ropt)
 {
-	setup_connection(AHX, dopt, NULL, NULL);
+	setup_connection(AHX, dopt, NULL, NULL, NULL);
 }
 
 static char *