mirror of
https://github.com/postgres/postgres.git
synced 2025-05-12 16:21:30 +03:00
Run pgindent, pgperltidy, and reformat-dat-files. I manually fixed a couple of comments that pgindent uglified.
474 lines
13 KiB
C
474 lines
13 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* basebackup_copy.c
|
|
* send basebackup archives using COPY OUT
|
|
*
|
|
* We send a result set with information about the tabelspaces to be included
|
|
* in the backup before starting COPY OUT. Then, we start a single COPY OUT
|
|
* operation and transmits all the archives and the manifest if present during
|
|
* the course of that single COPY OUT. Each CopyData message begins with a
|
|
* type byte, allowing us to signal the start of a new archive, or the
|
|
* manifest, by some means other than ending the COPY stream. This also allows
|
|
* for future protocol extensions, since we can include arbitrary information
|
|
* in the message stream as long as we're certain that the client will know
|
|
* what to do with it.
|
|
*
|
|
* An older method that sent each archive using a separate COPY OUT
|
|
* operation is no longer supported.
|
|
*
|
|
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/basebackup_copy.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "catalog/pg_type_d.h"
|
|
#include "libpq/libpq.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "replication/basebackup.h"
|
|
#include "replication/basebackup_sink.h"
|
|
#include "utils/timestamp.h"
|
|
|
|
typedef struct bbsink_copystream
|
|
{
|
|
/* Common information for all types of sink. */
|
|
bbsink base;
|
|
|
|
/* Are we sending the archives to the client, or somewhere else? */
|
|
bool send_to_client;
|
|
|
|
/*
|
|
* Protocol message buffer. We assemble CopyData protocol messages by
|
|
* setting the first character of this buffer to 'd' (archive or manifest
|
|
* data) and then making base.bbs_buffer point to the second character so
|
|
* that the rest of the data gets copied into the message just where we
|
|
* want it.
|
|
*/
|
|
char *msgbuffer;
|
|
|
|
/*
|
|
* When did we last report progress to the client, and how much progress
|
|
* did we report?
|
|
*/
|
|
TimestampTz last_progress_report_time;
|
|
uint64 bytes_done_at_last_time_check;
|
|
} bbsink_copystream;
|
|
|
|
/*
|
|
* We don't want to send progress messages to the client excessively
|
|
* frequently. Ideally, we'd like to send a message when the time since the
|
|
* last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
|
|
* the system time every time we send a tiny bit of data seems too expensive.
|
|
* So we only check it after the number of bytes sine the last check reaches
|
|
* PROGRESS_REPORT_BYTE_INTERVAL.
|
|
*/
|
|
#define PROGRESS_REPORT_BYTE_INTERVAL 65536
|
|
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
|
|
|
|
static void bbsink_copystream_begin_backup(bbsink *sink);
|
|
static void bbsink_copystream_begin_archive(bbsink *sink,
|
|
const char *archive_name);
|
|
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
|
|
static void bbsink_copystream_end_archive(bbsink *sink);
|
|
static void bbsink_copystream_begin_manifest(bbsink *sink);
|
|
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
|
|
static void bbsink_copystream_end_manifest(bbsink *sink);
|
|
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
|
|
TimeLineID endtli);
|
|
static void bbsink_copystream_cleanup(bbsink *sink);
|
|
|
|
static void SendCopyOutResponse(void);
|
|
static void SendCopyDone(void);
|
|
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
|
|
static void SendTablespaceList(List *tablespaces);
|
|
static void send_int8_string(StringInfoData *buf, int64 intval);
|
|
|
|
static const bbsink_ops bbsink_copystream_ops = {
|
|
.begin_backup = bbsink_copystream_begin_backup,
|
|
.begin_archive = bbsink_copystream_begin_archive,
|
|
.archive_contents = bbsink_copystream_archive_contents,
|
|
.end_archive = bbsink_copystream_end_archive,
|
|
.begin_manifest = bbsink_copystream_begin_manifest,
|
|
.manifest_contents = bbsink_copystream_manifest_contents,
|
|
.end_manifest = bbsink_copystream_end_manifest,
|
|
.end_backup = bbsink_copystream_end_backup,
|
|
.cleanup = bbsink_copystream_cleanup
|
|
};
|
|
|
|
/*
|
|
* Create a new 'copystream' bbsink.
|
|
*/
|
|
bbsink *
|
|
bbsink_copystream_new(bool send_to_client)
|
|
{
|
|
bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
|
|
|
|
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
|
|
sink->send_to_client = send_to_client;
|
|
|
|
/* Set up for periodic progress reporting. */
|
|
sink->last_progress_report_time = GetCurrentTimestamp();
|
|
sink->bytes_done_at_last_time_check = UINT64CONST(0);
|
|
|
|
return &sink->base;
|
|
}
|
|
|
|
/*
|
|
* Send start-of-backup wire protocol messages.
|
|
*/
|
|
static void
|
|
bbsink_copystream_begin_backup(bbsink *sink)
|
|
{
|
|
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
|
bbsink_state *state = sink->bbs_state;
|
|
char *buf;
|
|
|
|
/*
|
|
* Initialize buffer. We ultimately want to send the archive and manifest
|
|
* data by means of CopyData messages where the payload portion of each
|
|
* message begins with a type byte. However, basebackup.c expects the
|
|
* buffer to be aligned, so we can't just allocate one extra byte for the
|
|
* type byte. Instead, allocate enough extra bytes that the portion of the
|
|
* buffer we reveal to our callers can be aligned, while leaving room to
|
|
* slip the type byte in just beforehand. That will allow us to ship the
|
|
* data with a single call to pq_putmessage and without needing any extra
|
|
* copying.
|
|
*/
|
|
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
|
|
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
|
|
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
|
|
mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
|
|
|
|
/* Tell client the backup start location. */
|
|
SendXlogRecPtrResult(state->startptr, state->starttli);
|
|
|
|
/* Send client a list of tablespaces. */
|
|
SendTablespaceList(state->tablespaces);
|
|
|
|
/* Send a CommandComplete message */
|
|
pq_puttextmessage('C', "SELECT");
|
|
|
|
/* Begin COPY stream. This will be used for all archives + manifest. */
|
|
SendCopyOutResponse();
|
|
}
|
|
|
|
/*
|
|
* Send a CopyData message announcing the beginning of a new archive.
|
|
*/
|
|
static void
|
|
bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
|
|
{
|
|
bbsink_state *state = sink->bbs_state;
|
|
tablespaceinfo *ti;
|
|
StringInfoData buf;
|
|
|
|
ti = list_nth(state->tablespaces, state->tablespace_num);
|
|
pq_beginmessage(&buf, 'd'); /* CopyData */
|
|
pq_sendbyte(&buf, 'n'); /* New archive */
|
|
pq_sendstring(&buf, archive_name);
|
|
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
|
|
pq_endmessage(&buf);
|
|
}
|
|
|
|
/*
|
|
* Send a CopyData message containing a chunk of archive content.
|
|
*/
|
|
static void
|
|
bbsink_copystream_archive_contents(bbsink *sink, size_t len)
|
|
{
|
|
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
|
bbsink_state *state = mysink->base.bbs_state;
|
|
StringInfoData buf;
|
|
uint64 targetbytes;
|
|
|
|
/* Send the archive content to the client, if appropriate. */
|
|
if (mysink->send_to_client)
|
|
{
|
|
/* Add one because we're also sending a leading type byte. */
|
|
pq_putmessage('d', mysink->msgbuffer, len + 1);
|
|
}
|
|
|
|
/* Consider whether to send a progress report to the client. */
|
|
targetbytes = mysink->bytes_done_at_last_time_check
|
|
+ PROGRESS_REPORT_BYTE_INTERVAL;
|
|
if (targetbytes <= state->bytes_done)
|
|
{
|
|
TimestampTz now = GetCurrentTimestamp();
|
|
long ms;
|
|
|
|
/*
|
|
* OK, we've sent a decent number of bytes, so check the system time
|
|
* to see whether we're due to send a progress report.
|
|
*/
|
|
mysink->bytes_done_at_last_time_check = state->bytes_done;
|
|
ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
|
|
now);
|
|
|
|
/*
|
|
* Send a progress report if enough time has passed. Also send one if
|
|
* the system clock was set backward, so that such occurrences don't
|
|
* have the effect of suppressing further progress messages.
|
|
*/
|
|
if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
|
|
{
|
|
mysink->last_progress_report_time = now;
|
|
|
|
pq_beginmessage(&buf, 'd'); /* CopyData */
|
|
pq_sendbyte(&buf, 'p'); /* Progress report */
|
|
pq_sendint64(&buf, state->bytes_done);
|
|
pq_endmessage(&buf);
|
|
pq_flush_if_writable();
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We don't need to explicitly signal the end of the archive; the client
|
|
* will figure out that we've reached the end when we begin the next one,
|
|
* or begin the manifest, or end the COPY stream. However, this seems like
|
|
* a good time to force out a progress report. One reason for that is that
|
|
* if this is the last archive, and we don't force a progress report now,
|
|
* the client will never be told that we sent all the bytes.
|
|
*/
|
|
static void
|
|
bbsink_copystream_end_archive(bbsink *sink)
|
|
{
|
|
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
|
bbsink_state *state = mysink->base.bbs_state;
|
|
StringInfoData buf;
|
|
|
|
mysink->bytes_done_at_last_time_check = state->bytes_done;
|
|
mysink->last_progress_report_time = GetCurrentTimestamp();
|
|
pq_beginmessage(&buf, 'd'); /* CopyData */
|
|
pq_sendbyte(&buf, 'p'); /* Progress report */
|
|
pq_sendint64(&buf, state->bytes_done);
|
|
pq_endmessage(&buf);
|
|
pq_flush_if_writable();
|
|
}
|
|
|
|
/*
|
|
* Send a CopyData message announcing the beginning of the backup manifest.
|
|
*/
|
|
static void
|
|
bbsink_copystream_begin_manifest(bbsink *sink)
|
|
{
|
|
StringInfoData buf;
|
|
|
|
pq_beginmessage(&buf, 'd'); /* CopyData */
|
|
pq_sendbyte(&buf, 'm'); /* Manifest */
|
|
pq_endmessage(&buf);
|
|
}
|
|
|
|
/*
|
|
* Each chunk of manifest data is sent using a CopyData message.
|
|
*/
|
|
static void
|
|
bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
|
|
{
|
|
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
|
|
|
if (mysink->send_to_client)
|
|
{
|
|
/* Add one because we're also sending a leading type byte. */
|
|
pq_putmessage('d', mysink->msgbuffer, len + 1);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We don't need an explicit terminator for the backup manifest.
|
|
*/
|
|
static void
|
|
bbsink_copystream_end_manifest(bbsink *sink)
|
|
{
|
|
/* Do nothing. */
|
|
}
|
|
|
|
/*
|
|
* Send end-of-backup wire protocol messages.
|
|
*/
|
|
static void
|
|
bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
|
|
TimeLineID endtli)
|
|
{
|
|
SendCopyDone();
|
|
SendXlogRecPtrResult(endptr, endtli);
|
|
}
|
|
|
|
/*
|
|
* Cleanup.
|
|
*/
|
|
static void
|
|
bbsink_copystream_cleanup(bbsink *sink)
|
|
{
|
|
/* Nothing to do. */
|
|
}
|
|
|
|
/*
|
|
* Send a CopyOutResponse message.
|
|
*/
|
|
static void
|
|
SendCopyOutResponse(void)
|
|
{
|
|
StringInfoData buf;
|
|
|
|
pq_beginmessage(&buf, 'H');
|
|
pq_sendbyte(&buf, 0); /* overall format */
|
|
pq_sendint16(&buf, 0); /* natts */
|
|
pq_endmessage(&buf);
|
|
}
|
|
|
|
/*
|
|
* Send a CopyDone message.
|
|
*/
|
|
static void
|
|
SendCopyDone(void)
|
|
{
|
|
pq_putemptymessage('c');
|
|
}
|
|
|
|
/*
|
|
* Send a single resultset containing just a single
|
|
* XLogRecPtr record (in text format)
|
|
*/
|
|
static void
|
|
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
|
{
|
|
StringInfoData buf;
|
|
char str[MAXFNAMELEN];
|
|
Size len;
|
|
|
|
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
|
pq_sendint16(&buf, 2); /* 2 fields */
|
|
|
|
/* Field headers */
|
|
pq_sendstring(&buf, "recptr");
|
|
pq_sendint32(&buf, 0); /* table oid */
|
|
pq_sendint16(&buf, 0); /* attnum */
|
|
pq_sendint32(&buf, TEXTOID); /* type oid */
|
|
pq_sendint16(&buf, -1);
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
|
|
pq_sendstring(&buf, "tli");
|
|
pq_sendint32(&buf, 0); /* table oid */
|
|
pq_sendint16(&buf, 0); /* attnum */
|
|
|
|
/*
|
|
* int8 may seem like a surprising data type for this, but in theory int4
|
|
* would not be wide enough for this, as TimeLineID is unsigned.
|
|
*/
|
|
pq_sendint32(&buf, INT8OID); /* type oid */
|
|
pq_sendint16(&buf, -1);
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
pq_endmessage(&buf);
|
|
|
|
/* Data row */
|
|
pq_beginmessage(&buf, 'D');
|
|
pq_sendint16(&buf, 2); /* number of columns */
|
|
|
|
len = snprintf(str, sizeof(str),
|
|
"%X/%X", LSN_FORMAT_ARGS(ptr));
|
|
pq_sendint32(&buf, len);
|
|
pq_sendbytes(&buf, str, len);
|
|
|
|
len = snprintf(str, sizeof(str), "%u", tli);
|
|
pq_sendint32(&buf, len);
|
|
pq_sendbytes(&buf, str, len);
|
|
|
|
pq_endmessage(&buf);
|
|
|
|
/* Send a CommandComplete message */
|
|
pq_puttextmessage('C', "SELECT");
|
|
}
|
|
|
|
/*
|
|
* Send a result set via libpq describing the tablespace list.
|
|
*/
|
|
static void
|
|
SendTablespaceList(List *tablespaces)
|
|
{
|
|
StringInfoData buf;
|
|
ListCell *lc;
|
|
|
|
/* Construct and send the directory information */
|
|
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
|
pq_sendint16(&buf, 3); /* 3 fields */
|
|
|
|
/* First field - spcoid */
|
|
pq_sendstring(&buf, "spcoid");
|
|
pq_sendint32(&buf, 0); /* table oid */
|
|
pq_sendint16(&buf, 0); /* attnum */
|
|
pq_sendint32(&buf, OIDOID); /* type oid */
|
|
pq_sendint16(&buf, 4); /* typlen */
|
|
pq_sendint32(&buf, 0); /* typmod */
|
|
pq_sendint16(&buf, 0); /* format code */
|
|
|
|
/* Second field - spclocation */
|
|
pq_sendstring(&buf, "spclocation");
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
pq_sendint32(&buf, TEXTOID);
|
|
pq_sendint16(&buf, -1);
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
|
|
/* Third field - size */
|
|
pq_sendstring(&buf, "size");
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
pq_sendint32(&buf, INT8OID);
|
|
pq_sendint16(&buf, 8);
|
|
pq_sendint32(&buf, 0);
|
|
pq_sendint16(&buf, 0);
|
|
pq_endmessage(&buf);
|
|
|
|
foreach(lc, tablespaces)
|
|
{
|
|
tablespaceinfo *ti = lfirst(lc);
|
|
|
|
/* Send one datarow message */
|
|
pq_beginmessage(&buf, 'D');
|
|
pq_sendint16(&buf, 3); /* number of columns */
|
|
if (ti->path == NULL)
|
|
{
|
|
pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
|
|
pq_sendint32(&buf, -1);
|
|
}
|
|
else
|
|
{
|
|
Size len;
|
|
|
|
len = strlen(ti->oid);
|
|
pq_sendint32(&buf, len);
|
|
pq_sendbytes(&buf, ti->oid, len);
|
|
|
|
len = strlen(ti->path);
|
|
pq_sendint32(&buf, len);
|
|
pq_sendbytes(&buf, ti->path, len);
|
|
}
|
|
if (ti->size >= 0)
|
|
send_int8_string(&buf, ti->size / 1024);
|
|
else
|
|
pq_sendint32(&buf, -1); /* NULL */
|
|
|
|
pq_endmessage(&buf);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Send a 64-bit integer as a string via the wire protocol.
|
|
*/
|
|
static void
|
|
send_int8_string(StringInfoData *buf, int64 intval)
|
|
{
|
|
char is[32];
|
|
|
|
sprintf(is, INT64_FORMAT, intval);
|
|
pq_sendint32(buf, strlen(is));
|
|
pq_sendbytes(buf, is, strlen(is));
|
|
}
|