mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-24 01:29:19 +03:00 
			
		
		
		
	Modify pg_basebackup to use a new COPY subprotocol for base backups.
In the new approach, all files across all tablespaces are sent in a single COPY OUT operation. The CopyData messages are no longer raw archive content; rather, each message is prefixed with a type byte that describes its purpose, e.g. 'n' signifies the start of a new archive and 'd' signifies archive or manifest data. This protocol is significantly more extensible than the old approach, since we can later create more message types, though not without concern for backward compatibility. The new protocol sends a few things to the client that the old one did not. First, it sends the name of each archive explicitly, instead of letting the client compute it. This is intended to make it easier to write future patches that might send archives in a format other that tar (e.g. cpio, pax, tar.gz). Second, it sends explicit progress messages rather than allowing the client to assume that progress is defined by the number of bytes received. This will help with future features where the server compresses the data, or sends it someplace directly rather than transmitting it to the client. The old protocol is still supported for compatibility with previous releases. The new protocol is selected by means of a new TARGET option to the BASE_BACKUP command. Currently, the only supported target is 'client'. Support for additional targets will be added in a later commit. Patch by me. The patch set of which this is a part has had review and/or testing from Jeevan Ladhe, Tushar Ahuja, Suraj Kharage, Dipesh Pandit, and Mark Dilger. Discussion: http://postgr.es/m/CA+TgmoaYZbz0=Yk797aOJwkGJC-LK3iXn+wzzMx7KdwNpZhS5g@mail.gmail.com
This commit is contained in:
		| @@ -2630,6 +2630,22 @@ The commands accepted in replication mode are: | ||||
|         </listitem> | ||||
|        </varlistentry> | ||||
|  | ||||
|        <varlistentry> | ||||
|         <term><literal>TARGET</literal> <replaceable>'target'</replaceable></term> | ||||
|         <listitem> | ||||
|          <para> | ||||
|           Tells the server where to send the backup. If not specified, | ||||
|           the legacy base backup protocol will be used. Otherwise, the new | ||||
|           protocol will be used, as described below. | ||||
|          </para> | ||||
|  | ||||
|          <para> | ||||
|           At present, the only supported value for this parameter is | ||||
|           <literal>client</literal>. | ||||
|          </para> | ||||
|         </listitem> | ||||
|        </varlistentry> | ||||
|  | ||||
|        <varlistentry> | ||||
|         <term><literal>PROGRESS [ <replaceable class="parameter">boolean</replaceable> ]</literal></term> | ||||
|         <listitem> | ||||
| @@ -2805,19 +2821,113 @@ The commands accepted in replication mode are: | ||||
|  | ||||
|      <para> | ||||
|       After the second regular result set, one or more CopyOutResponse results | ||||
|       will be sent, one for the main data directory and one for each additional tablespace other | ||||
|       than <literal>pg_default</literal> and <literal>pg_global</literal>. The data in | ||||
|       the CopyOutResponse results will be a tar format (following the | ||||
|       <quote>ustar interchange format</quote> specified in the POSIX 1003.1-2008 | ||||
|       standard) dump of the tablespace contents. Prior to | ||||
|       will be sent. If the <literal>TARGET</literal> option is not specified, | ||||
|       the legacy base backup protocol will be used. In this mode, | ||||
|       there will be one CopyOutResponse for the main directory, one for each | ||||
|       additional tablespace other than <literal>pg_default</literal> and | ||||
|       <literal>pg_global</literal>, and one for the backup manifested if | ||||
|       requested. The main data directory and any additional tablespaces will | ||||
|       be sent in tar format (following the <quote>ustar interchange | ||||
|       format</quote> specified in the POSIX 1003.1-2008 standard), and | ||||
|       the manifest will sent as a plain file.  Prior to | ||||
|       <literal>PostgreSQL</literal> 15, the server omitted the two trailing | ||||
|       blocks of zeroes specified in the standard, but this is no longer the | ||||
|       case. | ||||
|       After the tar data is complete, and if a backup manifest was requested, | ||||
|       another CopyOutResponse result is sent, containing the manifest data for the | ||||
|       current base backup. In any case, a final ordinary result set will be | ||||
|       sent, containing the WAL end position of the backup, in the same format as | ||||
|       the start position. | ||||
|      </para> | ||||
|   | ||||
|      <para> | ||||
|       New applications should specify the <literal>TARGET</literal> option. | ||||
|       When that option is used, a single CopyOutResponse will be sent, and | ||||
|       the payload of each CopyData message will contain a message in one of | ||||
|       the following formats: | ||||
|      </para> | ||||
|  | ||||
|      <para> | ||||
|       <variablelist> | ||||
|  | ||||
|       <varlistentry> | ||||
|         <term>new archive (B)</term> | ||||
|         <listitem><para><variablelist> | ||||
|           <varlistentry> | ||||
|             <term>Byte1('n')</term> | ||||
|             <listitem><para> | ||||
|               Identifes the messaage as indicating the start of a new archive. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|           <varlistentry> | ||||
|             <term>String</term> | ||||
|             <listitem><para> | ||||
|               The file name for this archive. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|           <varlistentry> | ||||
|             <term>String</term> | ||||
|             <listitem><para> | ||||
|               For the main data directory, an empty string. For other | ||||
|               tablespaces, the full path to the directory from which this | ||||
|               archive was created. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|         </variablelist></para></listitem> | ||||
|       </varlistentry> | ||||
|  | ||||
|       <varlistentry> | ||||
|         <term>manifest (B)</term> | ||||
|         <listitem><para><variablelist> | ||||
|           <varlistentry> | ||||
|             <term>Byte1('m')</term> | ||||
|             <listitem><para> | ||||
|               Identifes the message as indicating the start of the backup | ||||
|               manifest. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|         </variablelist></para></listitem> | ||||
|       </varlistentry> | ||||
|  | ||||
|       <varlistentry> | ||||
|         <term>archive or manifest data (B)</term> | ||||
|         <listitem><para><variablelist> | ||||
|           <varlistentry> | ||||
|             <term>Byte1('d')</term> | ||||
|             <listitem><para> | ||||
|               Identifes the message as containing archive or manifest data. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|           <varlistentry> | ||||
|             <term>Byte<replaceable>n</replaceable></term> | ||||
|             <listitem><para> | ||||
|               Data bytes. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|         </variablelist></para></listitem> | ||||
|       </varlistentry> | ||||
|  | ||||
|       <varlistentry> | ||||
|         <term>progress report (B)</term> | ||||
|         <listitem><para><variablelist> | ||||
|           <varlistentry> | ||||
|             <term>Byte1('p')</term> | ||||
|             <listitem><para> | ||||
|               Identifes the message as a progress report. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|           <varlistentry> | ||||
|             <term>Int64</term> | ||||
|             <listitem><para> | ||||
|               The number of bytes from the current tablespace for which | ||||
|               processing has been completed. | ||||
|             </para></listitem> | ||||
|           </varlistentry> | ||||
|         </variablelist></para></listitem> | ||||
|       </varlistentry> | ||||
|  | ||||
|       </variablelist> | ||||
|      </para> | ||||
|  | ||||
|      <para> | ||||
|       After the CopyOutResponse, or all such responses, have been sent, a | ||||
|       final ordinary result set will be sent, containing the WAL end position | ||||
|       of the backup, in the same format as the start position. | ||||
|      </para> | ||||
|  | ||||
|      <para> | ||||
|   | ||||
| @@ -53,6 +53,12 @@ | ||||
|  */ | ||||
| #define SINK_BUFFER_LENGTH			Max(32768, BLCKSZ) | ||||
|  | ||||
| typedef enum | ||||
| { | ||||
| 	BACKUP_TARGET_COMPAT, | ||||
| 	BACKUP_TARGET_CLIENT | ||||
| } backup_target_type; | ||||
|  | ||||
| typedef struct | ||||
| { | ||||
| 	const char *label; | ||||
| @@ -62,6 +68,7 @@ typedef struct | ||||
| 	bool		includewal; | ||||
| 	uint32		maxrate; | ||||
| 	bool		sendtblspcmapfile; | ||||
| 	backup_target_type target; | ||||
| 	backup_manifest_option manifest; | ||||
| 	pg_checksum_type manifest_checksum_type; | ||||
| } basebackup_options; | ||||
| @@ -694,8 +701,10 @@ parse_basebackup_options(List *options, basebackup_options *opt) | ||||
| 	bool		o_noverify_checksums = false; | ||||
| 	bool		o_manifest = false; | ||||
| 	bool		o_manifest_checksums = false; | ||||
| 	bool		o_target = false; | ||||
|  | ||||
| 	MemSet(opt, 0, sizeof(*opt)); | ||||
| 	opt->target = BACKUP_TARGET_COMPAT; | ||||
| 	opt->manifest = MANIFEST_OPTION_NO; | ||||
| 	opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C; | ||||
|  | ||||
| @@ -836,6 +845,22 @@ parse_basebackup_options(List *options, basebackup_options *opt) | ||||
| 								optval))); | ||||
| 			o_manifest_checksums = true; | ||||
| 		} | ||||
| 		else if (strcmp(defel->defname, "target") == 0) | ||||
| 		{ | ||||
| 			char	   *optval = defGetString(defel); | ||||
|  | ||||
| 			if (o_target) | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_SYNTAX_ERROR), | ||||
| 						 errmsg("duplicate option \"%s\"", defel->defname))); | ||||
| 			if (strcmp(optval, "client") == 0) | ||||
| 				opt->target = BACKUP_TARGET_CLIENT; | ||||
| 			else | ||||
| 				ereport(ERROR, | ||||
| 						(errcode(ERRCODE_SYNTAX_ERROR), | ||||
| 						 errmsg("unrecognized target: \"%s\"", optval))); | ||||
| 			o_target = true; | ||||
| 		} | ||||
| 		else | ||||
| 			ereport(ERROR, | ||||
| 					errcode(ERRCODE_SYNTAX_ERROR), | ||||
| @@ -881,7 +906,14 @@ SendBaseBackup(BaseBackupCmd *cmd) | ||||
| 		set_ps_display(activitymsg); | ||||
| 	} | ||||
|  | ||||
| 	/* Create a basic basebackup sink. */ | ||||
| 	/* | ||||
| 	 * If the TARGET option was specified, then we can use the new copy-stream | ||||
| 	 * protocol. If not, we must fall back to the old and less capable | ||||
| 	 * copy-tablespace protocol. | ||||
| 	 */ | ||||
| 	if (opt.target != BACKUP_TARGET_COMPAT) | ||||
| 		sink = bbsink_copystream_new(); | ||||
| 	else | ||||
| 		sink = bbsink_copytblspc_new(); | ||||
|  | ||||
| 	/* Set up network throttling, if client requested it */ | ||||
|   | ||||
| @@ -1,8 +1,27 @@ | ||||
| /*------------------------------------------------------------------------- | ||||
|  * | ||||
|  * basebackup_copy.c | ||||
|  *	  send basebackup archives using one COPY OUT operation per | ||||
|  *	  tablespace, and an additional COPY OUT for the backup manifest | ||||
|  *	  send basebackup archives using COPY OUT | ||||
|  * | ||||
|  * We have two different ways of doing this. | ||||
|  * | ||||
|  * 'copytblspc' is an older method still supported for compatibility | ||||
|  * with releases prior to v15. In this method, a separate COPY OUT | ||||
|  * operation is used for each tablespace. The manifest, if it is sent, | ||||
|  * uses an additional COPY OUT operation. | ||||
|  * | ||||
|  * 'copystream' sends a starts a single COPY OUT operation and transmits | ||||
|  * all the archives and the manifest if present during the course of that | ||||
|  * single COPY OUT. Each CopyData message begins with a type byte, | ||||
|  * allowing us to signal the start of a new archive, or the manifest, | ||||
|  * by some means other than ending the COPY stream. This also allows | ||||
|  * this protocol to be extended more easily, since we can include | ||||
|  * arbitrary information in the message stream as long as we're certain | ||||
|  * that the client will know what to do with it. | ||||
|  * | ||||
|  * Regardless of which method is used, we sent a result set with | ||||
|  * information about the tabelspaces to be included in the backup before | ||||
|  * starting COPY OUT. This result has the same format in every method. | ||||
|  * | ||||
|  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group | ||||
|  * | ||||
| @@ -18,6 +37,52 @@ | ||||
| #include "libpq/pqformat.h" | ||||
| #include "replication/basebackup.h" | ||||
| #include "replication/basebackup_sink.h" | ||||
| #include "utils/timestamp.h" | ||||
|  | ||||
| typedef struct bbsink_copystream | ||||
| { | ||||
| 	/* Common information for all types of sink. */ | ||||
| 	bbsink		base; | ||||
|  | ||||
| 	/* | ||||
| 	 * Protocol message buffer. We assemble CopyData protocol messages by | ||||
| 	 * setting the first character of this buffer to 'd' (archive or manifest | ||||
| 	 * data) and then making base.bbs_buffer point to the second character so | ||||
| 	 * that the rest of the data gets copied into the message just where we | ||||
| 	 * want it. | ||||
| 	 */ | ||||
| 	char	   *msgbuffer; | ||||
|  | ||||
| 	/* | ||||
| 	 * When did we last report progress to the client, and how much progress | ||||
| 	 * did we report? | ||||
| 	 */ | ||||
| 	TimestampTz last_progress_report_time; | ||||
| 	uint64		bytes_done_at_last_time_check; | ||||
| } bbsink_copystream; | ||||
|  | ||||
| /* | ||||
|  * We don't want to send progress messages to the client excessively | ||||
|  * frequently. Ideally, we'd like to send a message when the time since the | ||||
|  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking | ||||
|  * the system time every time we send a tiny bit of data seems too expensive. | ||||
|  * So we only check it after the number of bytes sine the last check reaches | ||||
|  * PROGRESS_REPORT_BYTE_INTERVAL. | ||||
|  */ | ||||
| #define	PROGRESS_REPORT_BYTE_INTERVAL				65536 | ||||
| #define PROGRESS_REPORT_MILLISECOND_THRESHOLD		1000 | ||||
|  | ||||
| static void bbsink_copystream_begin_backup(bbsink *sink); | ||||
| static void bbsink_copystream_begin_archive(bbsink *sink, | ||||
| 											const char *archive_name); | ||||
| static void bbsink_copystream_archive_contents(bbsink *sink, size_t len); | ||||
| static void bbsink_copystream_end_archive(bbsink *sink); | ||||
| static void bbsink_copystream_begin_manifest(bbsink *sink); | ||||
| static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len); | ||||
| static void bbsink_copystream_end_manifest(bbsink *sink); | ||||
| static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, | ||||
| 										 TimeLineID endtli); | ||||
| static void bbsink_copystream_cleanup(bbsink *sink); | ||||
|  | ||||
| static void bbsink_copytblspc_begin_backup(bbsink *sink); | ||||
| static void bbsink_copytblspc_begin_archive(bbsink *sink, | ||||
| @@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); | ||||
| static void SendTablespaceList(List *tablespaces); | ||||
| static void send_int8_string(StringInfoData *buf, int64 intval); | ||||
|  | ||||
| const bbsink_ops bbsink_copystream_ops = { | ||||
| 	.begin_backup = bbsink_copystream_begin_backup, | ||||
| 	.begin_archive = bbsink_copystream_begin_archive, | ||||
| 	.archive_contents = bbsink_copystream_archive_contents, | ||||
| 	.end_archive = bbsink_copystream_end_archive, | ||||
| 	.begin_manifest = bbsink_copystream_begin_manifest, | ||||
| 	.manifest_contents = bbsink_copystream_manifest_contents, | ||||
| 	.end_manifest = bbsink_copystream_end_manifest, | ||||
| 	.end_backup = bbsink_copystream_end_backup, | ||||
| 	.cleanup = bbsink_copystream_cleanup | ||||
| }; | ||||
|  | ||||
| const bbsink_ops bbsink_copytblspc_ops = { | ||||
| 	.begin_backup = bbsink_copytblspc_begin_backup, | ||||
| 	.begin_archive = bbsink_copytblspc_begin_archive, | ||||
| @@ -50,6 +127,202 @@ const bbsink_ops bbsink_copytblspc_ops = { | ||||
| 	.cleanup = bbsink_copytblspc_cleanup | ||||
| }; | ||||
|  | ||||
| /* | ||||
|  * Create a new 'copystream' bbsink. | ||||
|  */ | ||||
| bbsink * | ||||
| bbsink_copystream_new(void) | ||||
| { | ||||
| 	bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream)); | ||||
|  | ||||
| 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops; | ||||
|  | ||||
| 	/* Set up for periodic progress reporting. */ | ||||
| 	sink->last_progress_report_time = GetCurrentTimestamp(); | ||||
| 	sink->bytes_done_at_last_time_check = UINT64CONST(0); | ||||
|  | ||||
| 	return &sink->base; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Send start-of-backup wire protocol messages. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_begin_backup(bbsink *sink) | ||||
| { | ||||
| 	bbsink_copystream *mysink = (bbsink_copystream *) sink; | ||||
| 	bbsink_state *state = sink->bbs_state; | ||||
|  | ||||
| 	/* | ||||
| 	 * Initialize buffer. We ultimately want to send the archive and manifest | ||||
| 	 * data by means of CopyData messages where the payload portion of each | ||||
| 	 * message begins with a type byte, so we set up a buffer that begins with | ||||
| 	 * a the type byte we're going to need, and then arrange things so that | ||||
| 	 * the data we're given will be written just after that type byte. That | ||||
| 	 * will allow us to ship the data with a single call to pq_putmessage and | ||||
| 	 * without needing any extra copying. | ||||
| 	 */ | ||||
| 	mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1); | ||||
| 	mysink->base.bbs_buffer = mysink->msgbuffer + 1; | ||||
| 	mysink->msgbuffer[0] = 'd'; /* archive or manifest data */ | ||||
|  | ||||
| 	/* Tell client the backup start location. */ | ||||
| 	SendXlogRecPtrResult(state->startptr, state->starttli); | ||||
|  | ||||
| 	/* Send client a list of tablespaces. */ | ||||
| 	SendTablespaceList(state->tablespaces); | ||||
|  | ||||
| 	/* Send a CommandComplete message */ | ||||
| 	pq_puttextmessage('C', "SELECT"); | ||||
|  | ||||
| 	/* Begin COPY stream. This will be used for all archives + manifest. */ | ||||
| 	SendCopyOutResponse(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Send a CopyData message announcing the beginning of a new archive. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name) | ||||
| { | ||||
| 	bbsink_state *state = sink->bbs_state; | ||||
| 	tablespaceinfo *ti; | ||||
| 	StringInfoData buf; | ||||
|  | ||||
| 	ti = list_nth(state->tablespaces, state->tablespace_num); | ||||
| 	pq_beginmessage(&buf, 'd'); /* CopyData */ | ||||
| 	pq_sendbyte(&buf, 'n');		/* New archive */ | ||||
| 	pq_sendstring(&buf, archive_name); | ||||
| 	pq_sendstring(&buf, ti->path == NULL ? "" : ti->path); | ||||
| 	pq_endmessage(&buf); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Send a CopyData message containing a chunk of archive content. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_archive_contents(bbsink *sink, size_t len) | ||||
| { | ||||
| 	bbsink_copystream *mysink = (bbsink_copystream *) sink; | ||||
| 	bbsink_state *state = mysink->base.bbs_state; | ||||
| 	StringInfoData buf; | ||||
| 	uint64		targetbytes; | ||||
|  | ||||
| 	/* Send the archive content to the client (with leading type byte). */ | ||||
| 	pq_putmessage('d', mysink->msgbuffer, len + 1); | ||||
|  | ||||
| 	/* Consider whether to send a progress report to the client. */ | ||||
| 	targetbytes = mysink->bytes_done_at_last_time_check | ||||
| 		+ PROGRESS_REPORT_BYTE_INTERVAL; | ||||
| 	if (targetbytes <= state->bytes_done) | ||||
| 	{ | ||||
| 		TimestampTz now = GetCurrentTimestamp(); | ||||
| 		long		ms; | ||||
|  | ||||
| 		/* | ||||
| 		 * OK, we've sent a decent number of bytes, so check the system time | ||||
| 		 * to see whether we're due to send a progress report. | ||||
| 		 */ | ||||
| 		mysink->bytes_done_at_last_time_check = state->bytes_done; | ||||
| 		ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time, | ||||
| 											 now); | ||||
|  | ||||
| 		/* | ||||
| 		 * Send a progress report if enough time has passed. Also send one if | ||||
| 		 * the system clock was set backward, so that such occurrences don't | ||||
| 		 * have the effect of suppressing further progress messages. | ||||
| 		 */ | ||||
| 		if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD) | ||||
| 		{ | ||||
| 			mysink->last_progress_report_time = now; | ||||
|  | ||||
| 			pq_beginmessage(&buf, 'd'); /* CopyData */ | ||||
| 			pq_sendbyte(&buf, 'p'); /* Progress report */ | ||||
| 			pq_sendint64(&buf, state->bytes_done); | ||||
| 			pq_endmessage(&buf); | ||||
| 			pq_flush_if_writable(); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * We don't need to explicitly signal the end of the archive; the client | ||||
|  * will figure out that we've reached the end when we begin the next one, | ||||
|  * or begin the manifest, or end the COPY stream. However, this seems like | ||||
|  * a good time to force out a progress report. One reason for that is that | ||||
|  * if this is the last archive, and we don't force a progress report now, | ||||
|  * the client will never be told that we sent all the bytes. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_end_archive(bbsink *sink) | ||||
| { | ||||
| 	bbsink_copystream *mysink = (bbsink_copystream *) sink; | ||||
| 	bbsink_state *state = mysink->base.bbs_state; | ||||
| 	StringInfoData buf; | ||||
|  | ||||
| 	mysink->bytes_done_at_last_time_check = state->bytes_done; | ||||
| 	mysink->last_progress_report_time = GetCurrentTimestamp(); | ||||
| 	pq_beginmessage(&buf, 'd'); /* CopyData */ | ||||
| 	pq_sendbyte(&buf, 'p');		/* Progress report */ | ||||
| 	pq_sendint64(&buf, state->bytes_done); | ||||
| 	pq_endmessage(&buf); | ||||
| 	pq_flush_if_writable(); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Send a CopyData message announcing the beginning of the backup manifest. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_begin_manifest(bbsink *sink) | ||||
| { | ||||
| 	StringInfoData buf; | ||||
|  | ||||
| 	pq_beginmessage(&buf, 'd'); /* CopyData */ | ||||
| 	pq_sendbyte(&buf, 'm');		/* Manifest */ | ||||
| 	pq_endmessage(&buf); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Each chunk of manifest data is sent using a CopyData message. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_manifest_contents(bbsink *sink, size_t len) | ||||
| { | ||||
| 	bbsink_copystream *mysink = (bbsink_copystream *) sink; | ||||
|  | ||||
| 	/* Send the manifest content to the client (with leading type byte). */ | ||||
| 	pq_putmessage('d', mysink->msgbuffer, len + 1); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * We don't need an explicit terminator for the backup manifest. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_end_manifest(bbsink *sink) | ||||
| { | ||||
| 	/* Do nothing. */ | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Send end-of-backup wire protocol messages. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, | ||||
| 							 TimeLineID endtli) | ||||
| { | ||||
| 	SendCopyDone(); | ||||
| 	SendXlogRecPtrResult(endptr, endtli); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Cleanup. | ||||
|  */ | ||||
| static void | ||||
| bbsink_copystream_cleanup(bbsink *sink) | ||||
| { | ||||
| 	/* Nothing to do. */ | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Create a new 'copytblspc' bbsink. | ||||
|  */ | ||||
|   | ||||
| @@ -54,6 +54,16 @@ typedef struct TablespaceList | ||||
| 	TablespaceListCell *tail; | ||||
| } TablespaceList; | ||||
|  | ||||
| typedef struct ArchiveStreamState | ||||
| { | ||||
| 	int			tablespacenum; | ||||
| 	bbstreamer *streamer; | ||||
| 	bbstreamer *manifest_inject_streamer; | ||||
| 	PQExpBuffer manifest_buffer; | ||||
| 	char		manifest_filename[MAXPGPATH]; | ||||
| 	FILE	   *manifest_file; | ||||
| } ArchiveStreamState; | ||||
|  | ||||
| typedef struct WriteTarState | ||||
| { | ||||
| 	int			tablespacenum; | ||||
| @@ -174,6 +184,13 @@ static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, | ||||
| 										bbstreamer **manifest_inject_streamer_p, | ||||
| 										bool is_recovery_guc_supported, | ||||
| 										bool expect_unterminated_tarfile); | ||||
| static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, | ||||
| 									  void *callback_data); | ||||
| static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); | ||||
| static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor); | ||||
| static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor); | ||||
| static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor); | ||||
| static void ReportCopyDataParseError(size_t r, char *copybuf); | ||||
| static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, | ||||
| 						   bool tablespacenum); | ||||
| static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); | ||||
| @@ -1097,6 +1114,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation, | ||||
| 	return streamer; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Receive all of the archives the server wants to send - and the backup | ||||
|  * manifest if present - as a single COPY stream. | ||||
|  */ | ||||
| static void | ||||
| ReceiveArchiveStream(PGconn *conn) | ||||
| { | ||||
| 	ArchiveStreamState state; | ||||
|  | ||||
| 	/* Set up initial state. */ | ||||
| 	memset(&state, 0, sizeof(state)); | ||||
| 	state.tablespacenum = -1; | ||||
|  | ||||
| 	/* All the real work happens in ReceiveArchiveStreamChunk. */ | ||||
| 	ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); | ||||
|  | ||||
| 	/* If we wrote the backup manifest to a file, close the file. */ | ||||
| 	if (state.manifest_file !=NULL) | ||||
| 	{ | ||||
| 		fclose(state.manifest_file); | ||||
| 		state.manifest_file = NULL; | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * If we buffered the backup manifest in order to inject it into the | ||||
| 	 * output tarfile, do that now. | ||||
| 	 */ | ||||
| 	if (state.manifest_inject_streamer != NULL && | ||||
| 		state.manifest_buffer != NULL) | ||||
| 	{ | ||||
| 		bbstreamer_inject_file(state.manifest_inject_streamer, | ||||
| 							   "backup_manifest", | ||||
| 							   state.manifest_buffer->data, | ||||
| 							   state.manifest_buffer->len); | ||||
| 		destroyPQExpBuffer(state.manifest_buffer); | ||||
| 		state.manifest_buffer = NULL; | ||||
| 	} | ||||
|  | ||||
| 	/* If there's still an archive in progress, end processing. */ | ||||
| 	if (state.streamer != NULL) | ||||
| 	{ | ||||
| 		bbstreamer_finalize(state.streamer); | ||||
| 		bbstreamer_free(state.streamer); | ||||
| 		state.streamer = NULL; | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Receive one chunk of data sent by the server as part of a single COPY | ||||
|  * stream that includes all archives and the manifest. | ||||
|  */ | ||||
| static void | ||||
| ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) | ||||
| { | ||||
| 	ArchiveStreamState *state = callback_data; | ||||
| 	size_t		cursor = 0; | ||||
|  | ||||
| 	/* Each CopyData message begins with a type byte. */ | ||||
| 	switch (GetCopyDataByte(r, copybuf, &cursor)) | ||||
| 	{ | ||||
| 		case 'n': | ||||
| 			{ | ||||
| 				/* New archive. */ | ||||
| 				char	   *archive_name; | ||||
| 				char	   *spclocation; | ||||
|  | ||||
| 				/* | ||||
| 				 * We force a progress report at the end of each tablespace. A | ||||
| 				 * new tablespace starts when the previous one ends, except in | ||||
| 				 * the case of the very first one. | ||||
| 				 */ | ||||
| 				if (++state->tablespacenum > 0) | ||||
| 					progress_report(state->tablespacenum, true, false); | ||||
|  | ||||
| 				/* Sanity check. */ | ||||
| 				if (state->manifest_buffer != NULL || | ||||
| 					state->manifest_file !=NULL) | ||||
| 				{ | ||||
| 					pg_log_error("archives should precede manifest"); | ||||
| 					exit(1); | ||||
| 				} | ||||
|  | ||||
| 				/* Parse the rest of the CopyData message. */ | ||||
| 				archive_name = GetCopyDataString(r, copybuf, &cursor); | ||||
| 				spclocation = GetCopyDataString(r, copybuf, &cursor); | ||||
| 				GetCopyDataEnd(r, copybuf, cursor); | ||||
|  | ||||
| 				/* | ||||
| 				 * Basic sanity checks on the archive name: it shouldn't be | ||||
| 				 * empty, it shouldn't start with a dot, and it shouldn't | ||||
| 				 * contain a path separator. | ||||
| 				 */ | ||||
| 				if (archive_name[0] == '\0' || archive_name[0] == '.' || | ||||
| 					strchr(archive_name, '/') != NULL || | ||||
| 					strchr(archive_name, '\\') != NULL) | ||||
| 				{ | ||||
| 					pg_log_error("invalid archive name: \"%s\"", | ||||
| 								 archive_name); | ||||
| 					exit(1); | ||||
| 				} | ||||
|  | ||||
| 				/* | ||||
| 				 * An empty spclocation is treated as NULL. We expect this | ||||
| 				 * case to occur for the data directory itself, but not for | ||||
| 				 * any archives that correspond to tablespaces. | ||||
| 				 */ | ||||
| 				if (spclocation[0] == '\0') | ||||
| 					spclocation = NULL; | ||||
|  | ||||
| 				/* End processing of any prior archive. */ | ||||
| 				if (state->streamer != NULL) | ||||
| 				{ | ||||
| 					bbstreamer_finalize(state->streamer); | ||||
| 					bbstreamer_free(state->streamer); | ||||
| 					state->streamer = NULL; | ||||
| 				} | ||||
|  | ||||
| 				/* | ||||
| 				 * Create an appropriate backup streamer. We know that | ||||
| 				 * recovery GUCs are supported, because this protocol can only | ||||
| 				 * be used on v15+. | ||||
| 				 */ | ||||
| 				state->streamer = | ||||
| 					CreateBackupStreamer(archive_name, | ||||
| 										 spclocation, | ||||
| 										 &state->manifest_inject_streamer, | ||||
| 										 true, false); | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 		case 'd': | ||||
| 			{ | ||||
| 				/* Archive or manifest data. */ | ||||
| 				if (state->manifest_buffer != NULL) | ||||
| 				{ | ||||
| 					/* Manifest data, buffer in memory. */ | ||||
| 					appendPQExpBuffer(state->manifest_buffer, copybuf + 1, | ||||
| 									  r - 1); | ||||
| 				} | ||||
| 				else if (state->manifest_file !=NULL) | ||||
| 				{ | ||||
| 					/* Manifest data, write to disk. */ | ||||
| 					if (fwrite(copybuf + 1, r - 1, 1, | ||||
| 							   state->manifest_file) != 1) | ||||
| 					{ | ||||
| 						/* | ||||
| 						 * If fwrite() didn't set errno, assume that the | ||||
| 						 * problem is that we're out of disk space. | ||||
| 						 */ | ||||
| 						if (errno == 0) | ||||
| 							errno = ENOSPC; | ||||
| 						pg_log_error("could not write to file \"%s\": %m", | ||||
| 									 state->manifest_filename); | ||||
| 						exit(1); | ||||
| 					} | ||||
| 				} | ||||
| 				else if (state->streamer != NULL) | ||||
| 				{ | ||||
| 					/* Archive data. */ | ||||
| 					bbstreamer_content(state->streamer, NULL, copybuf + 1, | ||||
| 									   r - 1, BBSTREAMER_UNKNOWN); | ||||
| 				} | ||||
| 				else | ||||
| 				{ | ||||
| 					pg_log_error("unexpected payload data"); | ||||
| 					exit(1); | ||||
| 				} | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 		case 'p': | ||||
| 			{ | ||||
| 				/* | ||||
| 				 * Progress report. | ||||
| 				 * | ||||
| 				 * The remainder of the message is expected to be an 8-byte | ||||
| 				 * count of bytes completed. | ||||
| 				 */ | ||||
| 				totaldone = GetCopyDataUInt64(r, copybuf, &cursor); | ||||
| 				GetCopyDataEnd(r, copybuf, cursor); | ||||
|  | ||||
| 				/* | ||||
| 				 * The server shouldn't send progres report messages too | ||||
| 				 * often, so we force an update each time we receive one. | ||||
| 				 */ | ||||
| 				progress_report(state->tablespacenum, true, false); | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 		case 'm': | ||||
| 			{ | ||||
| 				/* | ||||
| 				 * Manifest data will be sent next. This message is not | ||||
| 				 * expected to have any further payload data. | ||||
| 				 */ | ||||
| 				GetCopyDataEnd(r, copybuf, cursor); | ||||
|  | ||||
| 				/* | ||||
| 				 * If we're supposed inject the manifest into the archive, we | ||||
| 				 * prepare to buffer it in memory; otherwise, we prepare to | ||||
| 				 * write it to a temporary file. | ||||
| 				 */ | ||||
| 				if (state->manifest_inject_streamer != NULL) | ||||
| 					state->manifest_buffer = createPQExpBuffer(); | ||||
| 				else | ||||
| 				{ | ||||
| 					snprintf(state->manifest_filename, | ||||
| 							 sizeof(state->manifest_filename), | ||||
| 							 "%s/backup_manifest.tmp", basedir); | ||||
| 					state->manifest_file = | ||||
| 						fopen(state->manifest_filename, "wb"); | ||||
| 					if (state->manifest_file == NULL) | ||||
| 					{ | ||||
| 						pg_log_error("could not create file \"%s\": %m", | ||||
| 									 state->manifest_filename); | ||||
| 						exit(1); | ||||
| 					} | ||||
| 				} | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 		default: | ||||
| 			ReportCopyDataParseError(r, copybuf); | ||||
| 			break; | ||||
| 	} | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Get a single byte from a CopyData message. | ||||
|  * | ||||
|  * Bail out if none remain. | ||||
|  */ | ||||
| static char | ||||
| GetCopyDataByte(size_t r, char *copybuf, size_t *cursor) | ||||
| { | ||||
| 	if (*cursor >= r) | ||||
| 		ReportCopyDataParseError(r, copybuf); | ||||
|  | ||||
| 	return copybuf[(*cursor)++]; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Get a NUL-terminated string from a CopyData message. | ||||
|  * | ||||
|  * Bail out if the terminating NUL cannot be found. | ||||
|  */ | ||||
| static char * | ||||
| GetCopyDataString(size_t r, char *copybuf, size_t *cursor) | ||||
| { | ||||
| 	size_t		startpos = *cursor; | ||||
| 	size_t		endpos = startpos; | ||||
|  | ||||
| 	while (1) | ||||
| 	{ | ||||
| 		if (endpos >= r) | ||||
| 			ReportCopyDataParseError(r, copybuf); | ||||
| 		if (copybuf[endpos] == '\0') | ||||
| 			break; | ||||
| 		++endpos; | ||||
| 	} | ||||
|  | ||||
| 	*cursor = endpos + 1; | ||||
| 	return ©buf[startpos]; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Get an unsigned 64-bit integer from a CopyData message. | ||||
|  * | ||||
|  * Bail out if there are not at least 8 bytes remaining. | ||||
|  */ | ||||
| static uint64 | ||||
| GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor) | ||||
| { | ||||
| 	uint64		result; | ||||
|  | ||||
| 	if (*cursor + sizeof(uint64) > r) | ||||
| 		ReportCopyDataParseError(r, copybuf); | ||||
| 	memcpy(&result, ©buf[*cursor], sizeof(uint64)); | ||||
| 	*cursor += sizeof(uint64); | ||||
| 	return pg_ntoh64(result); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Bail out if we didn't parse the whole message. | ||||
|  */ | ||||
| static void | ||||
| GetCopyDataEnd(size_t r, char *copybuf, size_t cursor) | ||||
| { | ||||
| 	if (r != cursor) | ||||
| 		ReportCopyDataParseError(r, copybuf); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Report failure to parse a CopyData message from the server. Then exit. | ||||
|  * | ||||
|  * As a debugging aid, we try to give some hint about what kind of message | ||||
|  * provoked the failure. Perhaps this is not detailed enough, but it's not | ||||
|  * clear that it's worth expending any more code on what shoud be a | ||||
|  * can't-happen case. | ||||
|  */ | ||||
| static void | ||||
| ReportCopyDataParseError(size_t r, char *copybuf) | ||||
| { | ||||
| 	if (r == 0) | ||||
| 		pg_log_error("empty COPY message"); | ||||
| 	else | ||||
| 		pg_log_error("malformed COPY message of type %d, length %zu", | ||||
| 					 copybuf[0], r); | ||||
| 	exit(1); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Receive raw tar data from the server, and stream it to the appropriate | ||||
|  * location. If we're writing a single tarfile to standard output, also | ||||
| @@ -1376,6 +1704,10 @@ BaseBackup(void) | ||||
| 										 "MANIFEST_CHECKSUMS", manifest_checksums); | ||||
| 	} | ||||
|  | ||||
| 	if (serverMajor >= 1500) | ||||
| 		AppendStringCommandOption(&buf, use_new_option_syntax, | ||||
| 								  "TARGET", "client"); | ||||
|  | ||||
| 	if (verbose) | ||||
| 		pg_log_info("initiating base backup, waiting for checkpoint to complete"); | ||||
|  | ||||
| @@ -1498,6 +1830,13 @@ BaseBackup(void) | ||||
| 		StartLogStreamer(xlogstart, starttli, sysidentifier); | ||||
| 	} | ||||
|  | ||||
| 	if (serverMajor >= 1500) | ||||
| 	{ | ||||
| 		/* Receive a single tar stream with everything. */ | ||||
| 		ReceiveArchiveStream(conn); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		/* Receive a tar file for each tablespace in turn */ | ||||
| 		for (i = 0; i < PQntuples(res); i++) | ||||
| 		{ | ||||
| @@ -1505,10 +1844,11 @@ BaseBackup(void) | ||||
| 			char	   *spclocation; | ||||
|  | ||||
| 			/* | ||||
| 		 * If we write the data out to a tar file, it will be named base.tar | ||||
| 		 * if it's the main data directory or <tablespaceoid>.tar if it's for | ||||
| 		 * another tablespace. CreateBackupStreamer() will arrange to add .gz | ||||
| 		 * to the archive name if pg_basebackup is performing compression. | ||||
| 			 * If we write the data out to a tar file, it will be named | ||||
| 			 * base.tar if it's the main data directory or <tablespaceoid>.tar | ||||
| 			 * if it's for another tablespace. CreateBackupStreamer() will | ||||
| 			 * arrange to add .gz to the archive name if pg_basebackup is | ||||
| 			 * performing compression. | ||||
| 			 */ | ||||
| 			if (PQgetisnull(res, i, 0)) | ||||
| 			{ | ||||
| @@ -1528,15 +1868,17 @@ BaseBackup(void) | ||||
| 		/* | ||||
| 		 * Now receive backup manifest, if appropriate. | ||||
| 		 * | ||||
| 	 * If we're writing a tarfile to stdout, ReceiveTarFile will have already | ||||
| 	 * processed the backup manifest and included it in the output tarfile. | ||||
| 	 * Such a configuration doesn't allow for writing multiple files. | ||||
| 		 * If we're writing a tarfile to stdout, ReceiveTarFile will have | ||||
| 		 * already processed the backup manifest and included it in the output | ||||
| 		 * tarfile.  Such a configuration doesn't allow for writing multiple | ||||
| 		 * files. | ||||
| 		 * | ||||
| 	 * If we're talking to an older server, it won't send a backup manifest, | ||||
| 	 * so don't try to receive one. | ||||
| 		 * If we're talking to an older server, it won't send a backup | ||||
| 		 * manifest, so don't try to receive one. | ||||
| 		 */ | ||||
| 		if (!writing_to_stdout && manifest) | ||||
| 			ReceiveBackupManifest(conn); | ||||
| 	} | ||||
|  | ||||
| 	if (showprogress) | ||||
| 	{ | ||||
|   | ||||
| @@ -282,6 +282,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, | ||||
| extern void bbsink_forward_cleanup(bbsink *sink); | ||||
|  | ||||
| /* Constructors for various types of sinks. */ | ||||
| extern bbsink *bbsink_copystream_new(void); | ||||
| extern bbsink *bbsink_copytblspc_new(void); | ||||
| extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); | ||||
| extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user