diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 34a7034282a..7e59edb1cc5 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2630,6 +2630,22 @@ The commands accepted in replication mode are:
+
+ TARGET 'target'
+
+
+ Tells the server where to send the backup. If not specified,
+ the legacy base backup protocol will be used. Otherwise, the new
+ protocol will be used, as described below.
+
+
+
+ At present, the only supported value for this parameter is
+ client.
+
+
+
+
PROGRESS [ boolean ]
@@ -2805,19 +2821,113 @@ The commands accepted in replication mode are:
After the second regular result set, one or more CopyOutResponse results
- will be sent, one for the main data directory and one for each additional tablespace other
- than pg_default and pg_global. The data in
- the CopyOutResponse results will be a tar format (following the
- ustar interchange format
specified in the POSIX 1003.1-2008
- standard) dump of the tablespace contents. Prior to
+ will be sent. If the TARGET option is not specified,
+ the legacy base backup protocol will be used. In this mode,
+ there will be one CopyOutResponse for the main directory, one for each
+ additional tablespace other than pg_default and
+ pg_global, and one for the backup manifested if
+ requested. The main data directory and any additional tablespaces will
+ be sent in tar format (following the ustar interchange
+ format
specified in the POSIX 1003.1-2008 standard), and
+ the manifest will sent as a plain file. Prior to
PostgreSQL 15, the server omitted the two trailing
blocks of zeroes specified in the standard, but this is no longer the
case.
- After the tar data is complete, and if a backup manifest was requested,
- another CopyOutResponse result is sent, containing the manifest data for the
- current base backup. In any case, a final ordinary result set will be
- sent, containing the WAL end position of the backup, in the same format as
- the start position.
+
+
+
+ New applications should specify the TARGET option.
+ When that option is used, a single CopyOutResponse will be sent, and
+ the payload of each CopyData message will contain a message in one of
+ the following formats:
+
+
+
+
+
+
+ new archive (B)
+
+
+ Byte1('n')
+
+ Identifes the messaage as indicating the start of a new archive.
+
+
+
+ String
+
+ The file name for this archive.
+
+
+
+ String
+
+ For the main data directory, an empty string. For other
+ tablespaces, the full path to the directory from which this
+ archive was created.
+
+
+
+
+
+
+ manifest (B)
+
+
+ Byte1('m')
+
+ Identifes the message as indicating the start of the backup
+ manifest.
+
+
+
+
+
+
+ archive or manifest data (B)
+
+
+ Byte1('d')
+
+ Identifes the message as containing archive or manifest data.
+
+
+
+ Byten
+
+ Data bytes.
+
+
+
+
+
+
+ progress report (B)
+
+
+ Byte1('p')
+
+ Identifes the message as a progress report.
+
+
+
+ Int64
+
+ The number of bytes from the current tablespace for which
+ processing has been completed.
+
+
+
+
+
+
+
+
+
+ After the CopyOutResponse, or all such responses, have been sent, a
+ final ordinary result set will be sent, containing the WAL end position
+ of the backup, in the same format as the start position.
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 53dedc73c2a..3afbbe7e02e 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -53,6 +53,12 @@
*/
#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
+typedef enum
+{
+ BACKUP_TARGET_COMPAT,
+ BACKUP_TARGET_CLIENT
+} backup_target_type;
+
typedef struct
{
const char *label;
@@ -62,6 +68,7 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ backup_target_type target;
backup_manifest_option manifest;
pg_checksum_type manifest_checksum_type;
} basebackup_options;
@@ -694,8 +701,10 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_noverify_checksums = false;
bool o_manifest = false;
bool o_manifest_checksums = false;
+ bool o_target = false;
MemSet(opt, 0, sizeof(*opt));
+ opt->target = BACKUP_TARGET_COMPAT;
opt->manifest = MANIFEST_OPTION_NO;
opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C;
@@ -836,6 +845,22 @@ parse_basebackup_options(List *options, basebackup_options *opt)
optval)));
o_manifest_checksums = true;
}
+ else if (strcmp(defel->defname, "target") == 0)
+ {
+ char *optval = defGetString(defel);
+
+ if (o_target)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ if (strcmp(optval, "client") == 0)
+ opt->target = BACKUP_TARGET_CLIENT;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized target: \"%s\"", optval)));
+ o_target = true;
+ }
else
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
@@ -881,8 +906,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg);
}
- /* Create a basic basebackup sink. */
- sink = bbsink_copytblspc_new();
+ /*
+ * If the TARGET option was specified, then we can use the new copy-stream
+ * protocol. If not, we must fall back to the old and less capable
+ * copy-tablespace protocol.
+ */
+ if (opt.target != BACKUP_TARGET_COMPAT)
+ sink = bbsink_copystream_new();
+ else
+ sink = bbsink_copytblspc_new();
/* Set up network throttling, if client requested it */
if (opt.maxrate > 0)
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
index abacc35fcd2..f42b368c033 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -1,8 +1,27 @@
/*-------------------------------------------------------------------------
*
* basebackup_copy.c
- * send basebackup archives using one COPY OUT operation per
- * tablespace, and an additional COPY OUT for the backup manifest
+ * send basebackup archives using COPY OUT
+ *
+ * We have two different ways of doing this.
+ *
+ * 'copytblspc' is an older method still supported for compatibility
+ * with releases prior to v15. In this method, a separate COPY OUT
+ * operation is used for each tablespace. The manifest, if it is sent,
+ * uses an additional COPY OUT operation.
+ *
+ * 'copystream' sends a starts a single COPY OUT operation and transmits
+ * all the archives and the manifest if present during the course of that
+ * single COPY OUT. Each CopyData message begins with a type byte,
+ * allowing us to signal the start of a new archive, or the manifest,
+ * by some means other than ending the COPY stream. This also allows
+ * this protocol to be extended more easily, since we can include
+ * arbitrary information in the message stream as long as we're certain
+ * that the client will know what to do with it.
+ *
+ * Regardless of which method is used, we sent a result set with
+ * information about the tabelspaces to be included in the backup before
+ * starting COPY OUT. This result has the same format in every method.
*
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
*
@@ -18,6 +37,52 @@
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_copystream
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /*
+ * Protocol message buffer. We assemble CopyData protocol messages by
+ * setting the first character of this buffer to 'd' (archive or manifest
+ * data) and then making base.bbs_buffer point to the second character so
+ * that the rest of the data gets copied into the message just where we
+ * want it.
+ */
+ char *msgbuffer;
+
+ /*
+ * When did we last report progress to the client, and how much progress
+ * did we report?
+ */
+ TimestampTz last_progress_report_time;
+ uint64 bytes_done_at_last_time_check;
+} bbsink_copystream;
+
+/*
+ * We don't want to send progress messages to the client excessively
+ * frequently. Ideally, we'd like to send a message when the time since the
+ * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
+ * the system time every time we send a tiny bit of data seems too expensive.
+ * So we only check it after the number of bytes sine the last check reaches
+ * PROGRESS_REPORT_BYTE_INTERVAL.
+ */
+#define PROGRESS_REPORT_BYTE_INTERVAL 65536
+#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
+
+static void bbsink_copystream_begin_backup(bbsink *sink);
+static void bbsink_copystream_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_archive(bbsink *sink);
+static void bbsink_copystream_begin_manifest(bbsink *sink);
+static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_manifest(bbsink *sink);
+static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copystream_cleanup(bbsink *sink);
static void bbsink_copytblspc_begin_backup(bbsink *sink);
static void bbsink_copytblspc_begin_archive(bbsink *sink,
@@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static void SendTablespaceList(List *tablespaces);
static void send_int8_string(StringInfoData *buf, int64 intval);
+const bbsink_ops bbsink_copystream_ops = {
+ .begin_backup = bbsink_copystream_begin_backup,
+ .begin_archive = bbsink_copystream_begin_archive,
+ .archive_contents = bbsink_copystream_archive_contents,
+ .end_archive = bbsink_copystream_end_archive,
+ .begin_manifest = bbsink_copystream_begin_manifest,
+ .manifest_contents = bbsink_copystream_manifest_contents,
+ .end_manifest = bbsink_copystream_end_manifest,
+ .end_backup = bbsink_copystream_end_backup,
+ .cleanup = bbsink_copystream_cleanup
+};
+
const bbsink_ops bbsink_copytblspc_ops = {
.begin_backup = bbsink_copytblspc_begin_backup,
.begin_archive = bbsink_copytblspc_begin_archive,
@@ -50,6 +127,202 @@ const bbsink_ops bbsink_copytblspc_ops = {
.cleanup = bbsink_copytblspc_cleanup
};
+/*
+ * Create a new 'copystream' bbsink.
+ */
+bbsink *
+bbsink_copystream_new(void)
+{
+ bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
+
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
+
+ /* Set up for periodic progress reporting. */
+ sink->last_progress_report_time = GetCurrentTimestamp();
+ sink->bytes_done_at_last_time_check = UINT64CONST(0);
+
+ return &sink->base;
+}
+
+/*
+ * Send start-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_begin_backup(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = sink->bbs_state;
+
+ /*
+ * Initialize buffer. We ultimately want to send the archive and manifest
+ * data by means of CopyData messages where the payload portion of each
+ * message begins with a type byte, so we set up a buffer that begins with
+ * a the type byte we're going to need, and then arrange things so that
+ * the data we're given will be written just after that type byte. That
+ * will allow us to ship the data with a single call to pq_putmessage and
+ * without needing any extra copying.
+ */
+ mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1);
+ mysink->base.bbs_buffer = mysink->msgbuffer + 1;
+ mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+
+ /* Begin COPY stream. This will be used for all archives + manifest. */
+ SendCopyOutResponse();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of a new archive.
+ */
+static void
+bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_state *state = sink->bbs_state;
+ tablespaceinfo *ti;
+ StringInfoData buf;
+
+ ti = list_nth(state->tablespaces, state->tablespace_num);
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendstring(&buf, archive_name);
+ pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message containing a chunk of archive content.
+ */
+static void
+bbsink_copystream_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+ uint64 targetbytes;
+
+ /* Send the archive content to the client (with leading type byte). */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+
+ /* Consider whether to send a progress report to the client. */
+ targetbytes = mysink->bytes_done_at_last_time_check
+ + PROGRESS_REPORT_BYTE_INTERVAL;
+ if (targetbytes <= state->bytes_done)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ long ms;
+
+ /*
+ * OK, we've sent a decent number of bytes, so check the system time
+ * to see whether we're due to send a progress report.
+ */
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
+ now);
+
+ /*
+ * Send a progress report if enough time has passed. Also send one if
+ * the system clock was set backward, so that such occurrences don't
+ * have the effect of suppressing further progress messages.
+ */
+ if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+ {
+ mysink->last_progress_report_time = now;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+ }
+ }
+}
+
+/*
+ * We don't need to explicitly signal the end of the archive; the client
+ * will figure out that we've reached the end when we begin the next one,
+ * or begin the manifest, or end the COPY stream. However, this seems like
+ * a good time to force out a progress report. One reason for that is that
+ * if this is the last archive, and we don't force a progress report now,
+ * the client will never be told that we sent all the bytes.
+ */
+static void
+bbsink_copystream_end_archive(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ mysink->last_progress_report_time = GetCurrentTimestamp();
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of the backup manifest.
+ */
+static void
+bbsink_copystream_begin_manifest(bbsink *sink)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+
+ /* Send the manifest content to the client (with leading type byte). */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+}
+
+/*
+ * We don't need an explicit terminator for the backup manifest.
+ */
+static void
+bbsink_copystream_end_manifest(bbsink *sink)
+{
+ /* Do nothing. */
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendCopyDone();
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copystream_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
/*
* Create a new 'copytblspc' bbsink.
*/
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index aa43fc09241..2a58be638a5 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -54,6 +54,16 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct ArchiveStreamState
+{
+ int tablespacenum;
+ bbstreamer *streamer;
+ bbstreamer *manifest_inject_streamer;
+ PQExpBuffer manifest_buffer;
+ char manifest_filename[MAXPGPATH];
+ FILE *manifest_file;
+} ArchiveStreamState;
+
typedef struct WriteTarState
{
int tablespacenum;
@@ -174,6 +184,13 @@ static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
bbstreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile);
+static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
+ void *callback_data);
+static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
+static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor);
+static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor);
+static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor);
+static void ReportCopyDataParseError(size_t r, char *copybuf);
static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
bool tablespacenum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
@@ -1097,6 +1114,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
return streamer;
}
+/*
+ * Receive all of the archives the server wants to send - and the backup
+ * manifest if present - as a single COPY stream.
+ */
+static void
+ReceiveArchiveStream(PGconn *conn)
+{
+ ArchiveStreamState state;
+
+ /* Set up initial state. */
+ memset(&state, 0, sizeof(state));
+ state.tablespacenum = -1;
+
+ /* All the real work happens in ReceiveArchiveStreamChunk. */
+ ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state);
+
+ /* If we wrote the backup manifest to a file, close the file. */
+ if (state.manifest_file !=NULL)
+ {
+ fclose(state.manifest_file);
+ state.manifest_file = NULL;
+ }
+
+ /*
+ * If we buffered the backup manifest in order to inject it into the
+ * output tarfile, do that now.
+ */
+ if (state.manifest_inject_streamer != NULL &&
+ state.manifest_buffer != NULL)
+ {
+ bbstreamer_inject_file(state.manifest_inject_streamer,
+ "backup_manifest",
+ state.manifest_buffer->data,
+ state.manifest_buffer->len);
+ destroyPQExpBuffer(state.manifest_buffer);
+ state.manifest_buffer = NULL;
+ }
+
+ /* If there's still an archive in progress, end processing. */
+ if (state.streamer != NULL)
+ {
+ bbstreamer_finalize(state.streamer);
+ bbstreamer_free(state.streamer);
+ state.streamer = NULL;
+ }
+}
+
+/*
+ * Receive one chunk of data sent by the server as part of a single COPY
+ * stream that includes all archives and the manifest.
+ */
+static void
+ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
+{
+ ArchiveStreamState *state = callback_data;
+ size_t cursor = 0;
+
+ /* Each CopyData message begins with a type byte. */
+ switch (GetCopyDataByte(r, copybuf, &cursor))
+ {
+ case 'n':
+ {
+ /* New archive. */
+ char *archive_name;
+ char *spclocation;
+
+ /*
+ * We force a progress report at the end of each tablespace. A
+ * new tablespace starts when the previous one ends, except in
+ * the case of the very first one.
+ */
+ if (++state->tablespacenum > 0)
+ progress_report(state->tablespacenum, true, false);
+
+ /* Sanity check. */
+ if (state->manifest_buffer != NULL ||
+ state->manifest_file !=NULL)
+ {
+ pg_log_error("archives should precede manifest");
+ exit(1);
+ }
+
+ /* Parse the rest of the CopyData message. */
+ archive_name = GetCopyDataString(r, copybuf, &cursor);
+ spclocation = GetCopyDataString(r, copybuf, &cursor);
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * Basic sanity checks on the archive name: it shouldn't be
+ * empty, it shouldn't start with a dot, and it shouldn't
+ * contain a path separator.
+ */
+ if (archive_name[0] == '\0' || archive_name[0] == '.' ||
+ strchr(archive_name, '/') != NULL ||
+ strchr(archive_name, '\\') != NULL)
+ {
+ pg_log_error("invalid archive name: \"%s\"",
+ archive_name);
+ exit(1);
+ }
+
+ /*
+ * An empty spclocation is treated as NULL. We expect this
+ * case to occur for the data directory itself, but not for
+ * any archives that correspond to tablespaces.
+ */
+ if (spclocation[0] == '\0')
+ spclocation = NULL;
+
+ /* End processing of any prior archive. */
+ if (state->streamer != NULL)
+ {
+ bbstreamer_finalize(state->streamer);
+ bbstreamer_free(state->streamer);
+ state->streamer = NULL;
+ }
+
+ /*
+ * Create an appropriate backup streamer. We know that
+ * recovery GUCs are supported, because this protocol can only
+ * be used on v15+.
+ */
+ state->streamer =
+ CreateBackupStreamer(archive_name,
+ spclocation,
+ &state->manifest_inject_streamer,
+ true, false);
+ break;
+ }
+
+ case 'd':
+ {
+ /* Archive or manifest data. */
+ if (state->manifest_buffer != NULL)
+ {
+ /* Manifest data, buffer in memory. */
+ appendPQExpBuffer(state->manifest_buffer, copybuf + 1,
+ r - 1);
+ }
+ else if (state->manifest_file !=NULL)
+ {
+ /* Manifest data, write to disk. */
+ if (fwrite(copybuf + 1, r - 1, 1,
+ state->manifest_file) != 1)
+ {
+ /*
+ * If fwrite() didn't set errno, assume that the
+ * problem is that we're out of disk space.
+ */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ state->manifest_filename);
+ exit(1);
+ }
+ }
+ else if (state->streamer != NULL)
+ {
+ /* Archive data. */
+ bbstreamer_content(state->streamer, NULL, copybuf + 1,
+ r - 1, BBSTREAMER_UNKNOWN);
+ }
+ else
+ {
+ pg_log_error("unexpected payload data");
+ exit(1);
+ }
+ break;
+ }
+
+ case 'p':
+ {
+ /*
+ * Progress report.
+ *
+ * The remainder of the message is expected to be an 8-byte
+ * count of bytes completed.
+ */
+ totaldone = GetCopyDataUInt64(r, copybuf, &cursor);
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * The server shouldn't send progres report messages too
+ * often, so we force an update each time we receive one.
+ */
+ progress_report(state->tablespacenum, true, false);
+ break;
+ }
+
+ case 'm':
+ {
+ /*
+ * Manifest data will be sent next. This message is not
+ * expected to have any further payload data.
+ */
+ GetCopyDataEnd(r, copybuf, cursor);
+
+ /*
+ * If we're supposed inject the manifest into the archive, we
+ * prepare to buffer it in memory; otherwise, we prepare to
+ * write it to a temporary file.
+ */
+ if (state->manifest_inject_streamer != NULL)
+ state->manifest_buffer = createPQExpBuffer();
+ else
+ {
+ snprintf(state->manifest_filename,
+ sizeof(state->manifest_filename),
+ "%s/backup_manifest.tmp", basedir);
+ state->manifest_file =
+ fopen(state->manifest_filename, "wb");
+ if (state->manifest_file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m",
+ state->manifest_filename);
+ exit(1);
+ }
+ }
+ break;
+ }
+
+ default:
+ ReportCopyDataParseError(r, copybuf);
+ break;
+ }
+}
+
+/*
+ * Get a single byte from a CopyData message.
+ *
+ * Bail out if none remain.
+ */
+static char
+GetCopyDataByte(size_t r, char *copybuf, size_t *cursor)
+{
+ if (*cursor >= r)
+ ReportCopyDataParseError(r, copybuf);
+
+ return copybuf[(*cursor)++];
+}
+
+/*
+ * Get a NUL-terminated string from a CopyData message.
+ *
+ * Bail out if the terminating NUL cannot be found.
+ */
+static char *
+GetCopyDataString(size_t r, char *copybuf, size_t *cursor)
+{
+ size_t startpos = *cursor;
+ size_t endpos = startpos;
+
+ while (1)
+ {
+ if (endpos >= r)
+ ReportCopyDataParseError(r, copybuf);
+ if (copybuf[endpos] == '\0')
+ break;
+ ++endpos;
+ }
+
+ *cursor = endpos + 1;
+ return ©buf[startpos];
+}
+
+/*
+ * Get an unsigned 64-bit integer from a CopyData message.
+ *
+ * Bail out if there are not at least 8 bytes remaining.
+ */
+static uint64
+GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor)
+{
+ uint64 result;
+
+ if (*cursor + sizeof(uint64) > r)
+ ReportCopyDataParseError(r, copybuf);
+ memcpy(&result, ©buf[*cursor], sizeof(uint64));
+ *cursor += sizeof(uint64);
+ return pg_ntoh64(result);
+}
+
+/*
+ * Bail out if we didn't parse the whole message.
+ */
+static void
+GetCopyDataEnd(size_t r, char *copybuf, size_t cursor)
+{
+ if (r != cursor)
+ ReportCopyDataParseError(r, copybuf);
+}
+
+/*
+ * Report failure to parse a CopyData message from the server. Then exit.
+ *
+ * As a debugging aid, we try to give some hint about what kind of message
+ * provoked the failure. Perhaps this is not detailed enough, but it's not
+ * clear that it's worth expending any more code on what shoud be a
+ * can't-happen case.
+ */
+static void
+ReportCopyDataParseError(size_t r, char *copybuf)
+{
+ if (r == 0)
+ pg_log_error("empty COPY message");
+ else
+ pg_log_error("malformed COPY message of type %d, length %zu",
+ copybuf[0], r);
+ exit(1);
+}
+
/*
* Receive raw tar data from the server, and stream it to the appropriate
* location. If we're writing a single tarfile to standard output, also
@@ -1376,6 +1704,10 @@ BaseBackup(void)
"MANIFEST_CHECKSUMS", manifest_checksums);
}
+ if (serverMajor >= 1500)
+ AppendStringCommandOption(&buf, use_new_option_syntax,
+ "TARGET", "client");
+
if (verbose)
pg_log_info("initiating base backup, waiting for checkpoint to complete");
@@ -1498,46 +1830,56 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /* Receive a tar file for each tablespace in turn */
- for (i = 0; i < PQntuples(res); i++)
+ if (serverMajor >= 1500)
{
- char archive_name[MAXPGPATH];
- char *spclocation;
+ /* Receive a single tar stream with everything. */
+ ReceiveArchiveStream(conn);
+ }
+ else
+ {
+ /* Receive a tar file for each tablespace in turn */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ char archive_name[MAXPGPATH];
+ char *spclocation;
+
+ /*
+ * If we write the data out to a tar file, it will be named
+ * base.tar if it's the main data directory or .tar
+ * if it's for another tablespace. CreateBackupStreamer() will
+ * arrange to add .gz to the archive name if pg_basebackup is
+ * performing compression.
+ */
+ if (PQgetisnull(res, i, 0))
+ {
+ strlcpy(archive_name, "base.tar", sizeof(archive_name));
+ spclocation = NULL;
+ }
+ else
+ {
+ snprintf(archive_name, sizeof(archive_name),
+ "%s.tar", PQgetvalue(res, i, 0));
+ spclocation = PQgetvalue(res, i, 1);
+ }
+
+ ReceiveTarFile(conn, archive_name, spclocation, i);
+ }
/*
- * If we write the data out to a tar file, it will be named base.tar
- * if it's the main data directory or .tar if it's for
- * another tablespace. CreateBackupStreamer() will arrange to add .gz
- * to the archive name if pg_basebackup is performing compression.
+ * Now receive backup manifest, if appropriate.
+ *
+ * If we're writing a tarfile to stdout, ReceiveTarFile will have
+ * already processed the backup manifest and included it in the output
+ * tarfile. Such a configuration doesn't allow for writing multiple
+ * files.
+ *
+ * If we're talking to an older server, it won't send a backup
+ * manifest, so don't try to receive one.
*/
- if (PQgetisnull(res, i, 0))
- {
- strlcpy(archive_name, "base.tar", sizeof(archive_name));
- spclocation = NULL;
- }
- else
- {
- snprintf(archive_name, sizeof(archive_name),
- "%s.tar", PQgetvalue(res, i, 0));
- spclocation = PQgetvalue(res, i, 1);
- }
-
- ReceiveTarFile(conn, archive_name, spclocation, i);
+ if (!writing_to_stdout && manifest)
+ ReceiveBackupManifest(conn);
}
- /*
- * Now receive backup manifest, if appropriate.
- *
- * If we're writing a tarfile to stdout, ReceiveTarFile will have already
- * processed the backup manifest and included it in the output tarfile.
- * Such a configuration doesn't allow for writing multiple files.
- *
- * If we're talking to an older server, it won't send a backup manifest,
- * so don't try to receive one.
- */
- if (!writing_to_stdout && manifest)
- ReceiveBackupManifest(conn);
-
if (showprogress)
{
progress_filename = NULL;
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
index 400ea347f58..25436defa8f 100644
--- a/src/include/replication/basebackup_sink.h
+++ b/src/include/replication/basebackup_sink.h
@@ -282,6 +282,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr,
extern void bbsink_forward_cleanup(bbsink *sink);
/* Constructors for various types of sinks. */
+extern bbsink *bbsink_copystream_new(void);
extern bbsink *bbsink_copytblspc_new(void);
extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);