mirror of
https://github.com/postgres/postgres.git
synced 2025-08-21 10:42:50 +03:00
Move basebackup code to new directory src/backend/backup
Reviewed by David Steele and Justin Pryzby Discussion: http://postgr.es/m/CA+TgmoafqboATDSoXHz8VLrSwK_MDhjthK4hEpYjqf9_1Fmczw%40mail.gmail.com
This commit is contained in:
30
src/backend/backup/Makefile
Normal file
30
src/backend/backup/Makefile
Normal file
@@ -0,0 +1,30 @@
|
||||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# Makefile--
|
||||
# Makefile for src/backend/backup
|
||||
#
|
||||
# IDENTIFICATION
|
||||
# src/backend/backup/Makefile
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
subdir = src/backend/backup
|
||||
top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
|
||||
|
||||
OBJS = \
|
||||
backup_manifest.o \
|
||||
basebackup.o \
|
||||
basebackup_copy.o \
|
||||
basebackup_gzip.o \
|
||||
basebackup_lz4.o \
|
||||
basebackup_zstd.o \
|
||||
basebackup_progress.o \
|
||||
basebackup_server.o \
|
||||
basebackup_sink.o \
|
||||
basebackup_target.o \
|
||||
basebackup_throttle.o
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
401
src/backend/backup/backup_manifest.c
Normal file
401
src/backend/backup/backup_manifest.c
Normal file
@@ -0,0 +1,401 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* backup_manifest.c
|
||||
* code for generating and sending a backup manifest
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/backup_manifest.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "backup/backup_manifest.h"
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/json.h"
|
||||
|
||||
static void AppendStringToManifest(backup_manifest_info *manifest, char *s);
|
||||
|
||||
/*
|
||||
* Does the user want a backup manifest?
|
||||
*
|
||||
* It's simplest to always have a manifest_info object, so that we don't need
|
||||
* checks for NULL pointers in too many places. However, if the user doesn't
|
||||
* want a manifest, we set manifest->buffile to NULL.
|
||||
*/
|
||||
static inline bool
|
||||
IsManifestEnabled(backup_manifest_info *manifest)
|
||||
{
|
||||
return (manifest->buffile != NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Convenience macro for appending data to the backup manifest.
|
||||
*/
|
||||
#define AppendToManifest(manifest, ...) \
|
||||
{ \
|
||||
char *_manifest_s = psprintf(__VA_ARGS__); \
|
||||
AppendStringToManifest(manifest, _manifest_s); \
|
||||
pfree(_manifest_s); \
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize state so that we can construct a backup manifest.
|
||||
*
|
||||
* NB: Although the checksum type for the data files is configurable, the
|
||||
* checksum for the manifest itself always uses SHA-256. See comments in
|
||||
* SendBackupManifest.
|
||||
*/
|
||||
void
|
||||
InitializeBackupManifest(backup_manifest_info *manifest,
|
||||
backup_manifest_option want_manifest,
|
||||
pg_checksum_type manifest_checksum_type)
|
||||
{
|
||||
memset(manifest, 0, sizeof(backup_manifest_info));
|
||||
manifest->checksum_type = manifest_checksum_type;
|
||||
|
||||
if (want_manifest == MANIFEST_OPTION_NO)
|
||||
manifest->buffile = NULL;
|
||||
else
|
||||
{
|
||||
manifest->buffile = BufFileCreateTemp(false);
|
||||
manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
|
||||
if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
|
||||
elog(ERROR, "failed to initialize checksum of backup manifest: %s",
|
||||
pg_cryptohash_error(manifest->manifest_ctx));
|
||||
}
|
||||
|
||||
manifest->manifest_size = UINT64CONST(0);
|
||||
manifest->force_encode = (want_manifest == MANIFEST_OPTION_FORCE_ENCODE);
|
||||
manifest->first_file = true;
|
||||
manifest->still_checksumming = true;
|
||||
|
||||
if (want_manifest != MANIFEST_OPTION_NO)
|
||||
AppendToManifest(manifest,
|
||||
"{ \"PostgreSQL-Backup-Manifest-Version\": 1,\n"
|
||||
"\"Files\": [");
|
||||
}
|
||||
|
||||
/*
|
||||
* Free resources assigned to a backup manifest constructed.
|
||||
*/
|
||||
void
|
||||
FreeBackupManifest(backup_manifest_info *manifest)
|
||||
{
|
||||
pg_cryptohash_free(manifest->manifest_ctx);
|
||||
manifest->manifest_ctx = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add an entry to the backup manifest for a file.
|
||||
*/
|
||||
void
|
||||
AddFileToBackupManifest(backup_manifest_info *manifest, const char *spcoid,
|
||||
const char *pathname, size_t size, pg_time_t mtime,
|
||||
pg_checksum_context *checksum_ctx)
|
||||
{
|
||||
char pathbuf[MAXPGPATH];
|
||||
int pathlen;
|
||||
StringInfoData buf;
|
||||
|
||||
if (!IsManifestEnabled(manifest))
|
||||
return;
|
||||
|
||||
/*
|
||||
* If this file is part of a tablespace, the pathname passed to this
|
||||
* function will be relative to the tar file that contains it. We want the
|
||||
* pathname relative to the data directory (ignoring the intermediate
|
||||
* symlink traversal).
|
||||
*/
|
||||
if (spcoid != NULL)
|
||||
{
|
||||
snprintf(pathbuf, sizeof(pathbuf), "pg_tblspc/%s/%s", spcoid,
|
||||
pathname);
|
||||
pathname = pathbuf;
|
||||
}
|
||||
|
||||
/*
|
||||
* Each file's entry needs to be separated from any entry that follows by
|
||||
* a comma, but there's no comma before the first one or after the last
|
||||
* one. To make that work, adding a file to the manifest starts by
|
||||
* terminating the most recently added line, with a comma if appropriate,
|
||||
* but does not terminate the line inserted for this file.
|
||||
*/
|
||||
initStringInfo(&buf);
|
||||
if (manifest->first_file)
|
||||
{
|
||||
appendStringInfoChar(&buf, '\n');
|
||||
manifest->first_file = false;
|
||||
}
|
||||
else
|
||||
appendStringInfoString(&buf, ",\n");
|
||||
|
||||
/*
|
||||
* Write the relative pathname to this file out to the manifest. The
|
||||
* manifest is always stored in UTF-8, so we have to encode paths that are
|
||||
* not valid in that encoding.
|
||||
*/
|
||||
pathlen = strlen(pathname);
|
||||
if (!manifest->force_encode &&
|
||||
pg_verify_mbstr(PG_UTF8, pathname, pathlen, true))
|
||||
{
|
||||
appendStringInfoString(&buf, "{ \"Path\": ");
|
||||
escape_json(&buf, pathname);
|
||||
appendStringInfoString(&buf, ", ");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfoString(&buf, "{ \"Encoded-Path\": \"");
|
||||
enlargeStringInfo(&buf, 2 * pathlen);
|
||||
buf.len += hex_encode(pathname, pathlen,
|
||||
&buf.data[buf.len]);
|
||||
appendStringInfoString(&buf, "\", ");
|
||||
}
|
||||
|
||||
appendStringInfo(&buf, "\"Size\": %zu, ", size);
|
||||
|
||||
/*
|
||||
* Convert last modification time to a string and append it to the
|
||||
* manifest. Since it's not clear what time zone to use and since time
|
||||
* zone definitions can change, possibly causing confusion, use GMT
|
||||
* always.
|
||||
*/
|
||||
appendStringInfoString(&buf, "\"Last-Modified\": \"");
|
||||
enlargeStringInfo(&buf, 128);
|
||||
buf.len += pg_strftime(&buf.data[buf.len], 128, "%Y-%m-%d %H:%M:%S %Z",
|
||||
pg_gmtime(&mtime));
|
||||
appendStringInfoChar(&buf, '"');
|
||||
|
||||
/* Add checksum information. */
|
||||
if (checksum_ctx->type != CHECKSUM_TYPE_NONE)
|
||||
{
|
||||
uint8 checksumbuf[PG_CHECKSUM_MAX_LENGTH];
|
||||
int checksumlen;
|
||||
|
||||
checksumlen = pg_checksum_final(checksum_ctx, checksumbuf);
|
||||
if (checksumlen < 0)
|
||||
elog(ERROR, "could not finalize checksum of file \"%s\"",
|
||||
pathname);
|
||||
|
||||
appendStringInfo(&buf,
|
||||
", \"Checksum-Algorithm\": \"%s\", \"Checksum\": \"",
|
||||
pg_checksum_type_name(checksum_ctx->type));
|
||||
enlargeStringInfo(&buf, 2 * checksumlen);
|
||||
buf.len += hex_encode((char *) checksumbuf, checksumlen,
|
||||
&buf.data[buf.len]);
|
||||
appendStringInfoChar(&buf, '"');
|
||||
}
|
||||
|
||||
/* Close out the object. */
|
||||
appendStringInfoString(&buf, " }");
|
||||
|
||||
/* OK, add it to the manifest. */
|
||||
AppendStringToManifest(manifest, buf.data);
|
||||
|
||||
/* Avoid leaking memory. */
|
||||
pfree(buf.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Add information about the WAL that will need to be replayed when restoring
|
||||
* this backup to the manifest.
|
||||
*/
|
||||
void
|
||||
AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
|
||||
TimeLineID starttli, XLogRecPtr endptr,
|
||||
TimeLineID endtli)
|
||||
{
|
||||
List *timelines;
|
||||
ListCell *lc;
|
||||
bool first_wal_range = true;
|
||||
bool found_start_timeline = false;
|
||||
|
||||
if (!IsManifestEnabled(manifest))
|
||||
return;
|
||||
|
||||
/* Terminate the list of files. */
|
||||
AppendStringToManifest(manifest, "\n],\n");
|
||||
|
||||
/* Read the timeline history for the ending timeline. */
|
||||
timelines = readTimeLineHistory(endtli);
|
||||
|
||||
/* Start a list of LSN ranges. */
|
||||
AppendStringToManifest(manifest, "\"WAL-Ranges\": [\n");
|
||||
|
||||
foreach(lc, timelines)
|
||||
{
|
||||
TimeLineHistoryEntry *entry = lfirst(lc);
|
||||
XLogRecPtr tl_beginptr;
|
||||
|
||||
/*
|
||||
* We only care about timelines that were active during the backup.
|
||||
* Skip any that ended before the backup started. (Note that if
|
||||
* entry->end is InvalidXLogRecPtr, it means that the timeline has not
|
||||
* yet ended.)
|
||||
*/
|
||||
if (!XLogRecPtrIsInvalid(entry->end) && entry->end < startptr)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Because the timeline history file lists newer timelines before
|
||||
* older ones, the first timeline we encounter that is new enough to
|
||||
* matter ought to match the ending timeline of the backup.
|
||||
*/
|
||||
if (first_wal_range && endtli != entry->tli)
|
||||
ereport(ERROR,
|
||||
errmsg("expected end timeline %u but found timeline %u",
|
||||
starttli, entry->tli));
|
||||
|
||||
/*
|
||||
* If this timeline entry matches with the timeline on which the
|
||||
* backup started, WAL needs to be checked from the start LSN of the
|
||||
* backup. If this entry refers to a newer timeline, WAL needs to be
|
||||
* checked since the beginning of this timeline, so use the LSN where
|
||||
* the timeline began.
|
||||
*/
|
||||
if (starttli == entry->tli)
|
||||
tl_beginptr = startptr;
|
||||
else
|
||||
{
|
||||
tl_beginptr = entry->begin;
|
||||
|
||||
/*
|
||||
* If we reach a TLI that has no valid beginning LSN, there can't
|
||||
* be any more timelines in the history after this point, so we'd
|
||||
* better have arrived at the expected starting TLI. If not,
|
||||
* something's gone horribly wrong.
|
||||
*/
|
||||
if (XLogRecPtrIsInvalid(entry->begin))
|
||||
ereport(ERROR,
|
||||
errmsg("expected start timeline %u but found timeline %u",
|
||||
starttli, entry->tli));
|
||||
}
|
||||
|
||||
AppendToManifest(manifest,
|
||||
"%s{ \"Timeline\": %u, \"Start-LSN\": \"%X/%X\", \"End-LSN\": \"%X/%X\" }",
|
||||
first_wal_range ? "" : ",\n",
|
||||
entry->tli,
|
||||
LSN_FORMAT_ARGS(tl_beginptr),
|
||||
LSN_FORMAT_ARGS(endptr));
|
||||
|
||||
if (starttli == entry->tli)
|
||||
{
|
||||
found_start_timeline = true;
|
||||
break;
|
||||
}
|
||||
|
||||
endptr = entry->begin;
|
||||
first_wal_range = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* The last entry in the timeline history for the ending timeline should
|
||||
* be the ending timeline itself. Verify that this is what we observed.
|
||||
*/
|
||||
if (!found_start_timeline)
|
||||
ereport(ERROR,
|
||||
errmsg("start timeline %u not found in history of timeline %u",
|
||||
starttli, endtli));
|
||||
|
||||
/* Terminate the list of WAL ranges. */
|
||||
AppendStringToManifest(manifest, "\n],\n");
|
||||
}
|
||||
|
||||
/*
|
||||
* Finalize the backup manifest, and send it to the client.
|
||||
*/
|
||||
void
|
||||
SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
|
||||
{
|
||||
uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
|
||||
char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
|
||||
size_t manifest_bytes_done = 0;
|
||||
|
||||
if (!IsManifestEnabled(manifest))
|
||||
return;
|
||||
|
||||
/*
|
||||
* Append manifest checksum, so that the problems with the manifest itself
|
||||
* can be detected.
|
||||
*
|
||||
* We always use SHA-256 for this, regardless of what algorithm is chosen
|
||||
* for checksumming the files. If we ever want to make the checksum
|
||||
* algorithm used for the manifest file variable, the client will need a
|
||||
* way to figure out which algorithm to use as close to the beginning of
|
||||
* the manifest file as possible, to avoid having to read the whole thing
|
||||
* twice.
|
||||
*/
|
||||
manifest->still_checksumming = false;
|
||||
if (pg_cryptohash_final(manifest->manifest_ctx, checksumbuf,
|
||||
sizeof(checksumbuf)) < 0)
|
||||
elog(ERROR, "failed to finalize checksum of backup manifest: %s",
|
||||
pg_cryptohash_error(manifest->manifest_ctx));
|
||||
AppendStringToManifest(manifest, "\"Manifest-Checksum\": \"");
|
||||
|
||||
hex_encode((char *) checksumbuf, sizeof checksumbuf, checksumstringbuf);
|
||||
checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH - 1] = '\0';
|
||||
|
||||
AppendStringToManifest(manifest, checksumstringbuf);
|
||||
AppendStringToManifest(manifest, "\"}\n");
|
||||
|
||||
/*
|
||||
* We've written all the data to the manifest file. Rewind the file so
|
||||
* that we can read it all back.
|
||||
*/
|
||||
if (BufFileSeek(manifest->buffile, 0, 0L, SEEK_SET))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not rewind temporary file")));
|
||||
|
||||
|
||||
/*
|
||||
* Send the backup manifest.
|
||||
*/
|
||||
bbsink_begin_manifest(sink);
|
||||
while (manifest_bytes_done < manifest->manifest_size)
|
||||
{
|
||||
size_t bytes_to_read;
|
||||
size_t rc;
|
||||
|
||||
bytes_to_read = Min(sink->bbs_buffer_length,
|
||||
manifest->manifest_size - manifest_bytes_done);
|
||||
rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
|
||||
bytes_to_read);
|
||||
if (rc != bytes_to_read)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from temporary file: %m")));
|
||||
bbsink_manifest_contents(sink, bytes_to_read);
|
||||
manifest_bytes_done += bytes_to_read;
|
||||
}
|
||||
bbsink_end_manifest(sink);
|
||||
|
||||
/* Release resources */
|
||||
BufFileClose(manifest->buffile);
|
||||
}
|
||||
|
||||
/*
|
||||
* Append a cstring to the manifest.
|
||||
*/
|
||||
static void
|
||||
AppendStringToManifest(backup_manifest_info *manifest, char *s)
|
||||
{
|
||||
int len = strlen(s);
|
||||
|
||||
Assert(manifest != NULL);
|
||||
if (manifest->still_checksumming)
|
||||
{
|
||||
if (pg_cryptohash_update(manifest->manifest_ctx, (uint8 *) s, len) < 0)
|
||||
elog(ERROR, "failed to update checksum of backup manifest: %s",
|
||||
pg_cryptohash_error(manifest->manifest_ctx));
|
||||
}
|
||||
BufFileWrite(manifest->buffile, s, len);
|
||||
manifest->manifest_size += len;
|
||||
}
|
1829
src/backend/backup/basebackup.c
Normal file
1829
src/backend/backup/basebackup.c
Normal file
File diff suppressed because it is too large
Load Diff
420
src/backend/backup/basebackup_copy.c
Normal file
420
src/backend/backup/basebackup_copy.c
Normal file
@@ -0,0 +1,420 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_copy.c
|
||||
* send basebackup archives using COPY OUT
|
||||
*
|
||||
* We send a result set with information about the tabelspaces to be included
|
||||
* in the backup before starting COPY OUT. Then, we start a single COPY OUT
|
||||
* operation and transmits all the archives and the manifest if present during
|
||||
* the course of that single COPY OUT. Each CopyData message begins with a
|
||||
* type byte, allowing us to signal the start of a new archive, or the
|
||||
* manifest, by some means other than ending the COPY stream. This also allows
|
||||
* for future protocol extensions, since we can include arbitrary information
|
||||
* in the message stream as long as we're certain that the client will know
|
||||
* what to do with it.
|
||||
*
|
||||
* An older method that sent each archive using a separate COPY OUT
|
||||
* operation is no longer supported.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_copy.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/tupdesc.h"
|
||||
#include "backup/basebackup.h"
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "catalog/pg_type_d.h"
|
||||
#include "executor/executor.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "tcop/dest.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
typedef struct bbsink_copystream
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* Are we sending the archives to the client, or somewhere else? */
|
||||
bool send_to_client;
|
||||
|
||||
/*
|
||||
* Protocol message buffer. We assemble CopyData protocol messages by
|
||||
* setting the first character of this buffer to 'd' (archive or manifest
|
||||
* data) and then making base.bbs_buffer point to the second character so
|
||||
* that the rest of the data gets copied into the message just where we
|
||||
* want it.
|
||||
*/
|
||||
char *msgbuffer;
|
||||
|
||||
/*
|
||||
* When did we last report progress to the client, and how much progress
|
||||
* did we report?
|
||||
*/
|
||||
TimestampTz last_progress_report_time;
|
||||
uint64 bytes_done_at_last_time_check;
|
||||
} bbsink_copystream;
|
||||
|
||||
/*
|
||||
* We don't want to send progress messages to the client excessively
|
||||
* frequently. Ideally, we'd like to send a message when the time since the
|
||||
* last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
|
||||
* the system time every time we send a tiny bit of data seems too expensive.
|
||||
* So we only check it after the number of bytes sine the last check reaches
|
||||
* PROGRESS_REPORT_BYTE_INTERVAL.
|
||||
*/
|
||||
#define PROGRESS_REPORT_BYTE_INTERVAL 65536
|
||||
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
|
||||
|
||||
static void bbsink_copystream_begin_backup(bbsink *sink);
|
||||
static void bbsink_copystream_begin_archive(bbsink *sink,
|
||||
const char *archive_name);
|
||||
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_copystream_end_archive(bbsink *sink);
|
||||
static void bbsink_copystream_begin_manifest(bbsink *sink);
|
||||
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_copystream_end_manifest(bbsink *sink);
|
||||
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
|
||||
TimeLineID endtli);
|
||||
static void bbsink_copystream_cleanup(bbsink *sink);
|
||||
|
||||
static void SendCopyOutResponse(void);
|
||||
static void SendCopyDone(void);
|
||||
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
|
||||
static void SendTablespaceList(List *tablespaces);
|
||||
|
||||
static const bbsink_ops bbsink_copystream_ops = {
|
||||
.begin_backup = bbsink_copystream_begin_backup,
|
||||
.begin_archive = bbsink_copystream_begin_archive,
|
||||
.archive_contents = bbsink_copystream_archive_contents,
|
||||
.end_archive = bbsink_copystream_end_archive,
|
||||
.begin_manifest = bbsink_copystream_begin_manifest,
|
||||
.manifest_contents = bbsink_copystream_manifest_contents,
|
||||
.end_manifest = bbsink_copystream_end_manifest,
|
||||
.end_backup = bbsink_copystream_end_backup,
|
||||
.cleanup = bbsink_copystream_cleanup
|
||||
};
|
||||
|
||||
/*
|
||||
* Create a new 'copystream' bbsink.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_copystream_new(bool send_to_client)
|
||||
{
|
||||
bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
|
||||
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
|
||||
sink->send_to_client = send_to_client;
|
||||
|
||||
/* Set up for periodic progress reporting. */
|
||||
sink->last_progress_report_time = GetCurrentTimestamp();
|
||||
sink->bytes_done_at_last_time_check = UINT64CONST(0);
|
||||
|
||||
return &sink->base;
|
||||
}
|
||||
|
||||
/*
|
||||
* Send start-of-backup wire protocol messages.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_begin_backup(bbsink *sink)
|
||||
{
|
||||
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
||||
bbsink_state *state = sink->bbs_state;
|
||||
char *buf;
|
||||
|
||||
/*
|
||||
* Initialize buffer. We ultimately want to send the archive and manifest
|
||||
* data by means of CopyData messages where the payload portion of each
|
||||
* message begins with a type byte. However, basebackup.c expects the
|
||||
* buffer to be aligned, so we can't just allocate one extra byte for the
|
||||
* type byte. Instead, allocate enough extra bytes that the portion of the
|
||||
* buffer we reveal to our callers can be aligned, while leaving room to
|
||||
* slip the type byte in just beforehand. That will allow us to ship the
|
||||
* data with a single call to pq_putmessage and without needing any extra
|
||||
* copying.
|
||||
*/
|
||||
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
|
||||
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
|
||||
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
|
||||
mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
|
||||
|
||||
/* Tell client the backup start location. */
|
||||
SendXlogRecPtrResult(state->startptr, state->starttli);
|
||||
|
||||
/* Send client a list of tablespaces. */
|
||||
SendTablespaceList(state->tablespaces);
|
||||
|
||||
/* Send a CommandComplete message */
|
||||
pq_puttextmessage('C', "SELECT");
|
||||
|
||||
/* Begin COPY stream. This will be used for all archives + manifest. */
|
||||
SendCopyOutResponse();
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a CopyData message announcing the beginning of a new archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
bbsink_state *state = sink->bbs_state;
|
||||
tablespaceinfo *ti;
|
||||
StringInfoData buf;
|
||||
|
||||
ti = list_nth(state->tablespaces, state->tablespace_num);
|
||||
pq_beginmessage(&buf, 'd'); /* CopyData */
|
||||
pq_sendbyte(&buf, 'n'); /* New archive */
|
||||
pq_sendstring(&buf, archive_name);
|
||||
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
|
||||
pq_endmessage(&buf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a CopyData message containing a chunk of archive content.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
||||
bbsink_state *state = mysink->base.bbs_state;
|
||||
StringInfoData buf;
|
||||
uint64 targetbytes;
|
||||
|
||||
/* Send the archive content to the client, if appropriate. */
|
||||
if (mysink->send_to_client)
|
||||
{
|
||||
/* Add one because we're also sending a leading type byte. */
|
||||
pq_putmessage('d', mysink->msgbuffer, len + 1);
|
||||
}
|
||||
|
||||
/* Consider whether to send a progress report to the client. */
|
||||
targetbytes = mysink->bytes_done_at_last_time_check
|
||||
+ PROGRESS_REPORT_BYTE_INTERVAL;
|
||||
if (targetbytes <= state->bytes_done)
|
||||
{
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
long ms;
|
||||
|
||||
/*
|
||||
* OK, we've sent a decent number of bytes, so check the system time
|
||||
* to see whether we're due to send a progress report.
|
||||
*/
|
||||
mysink->bytes_done_at_last_time_check = state->bytes_done;
|
||||
ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
|
||||
now);
|
||||
|
||||
/*
|
||||
* Send a progress report if enough time has passed. Also send one if
|
||||
* the system clock was set backward, so that such occurrences don't
|
||||
* have the effect of suppressing further progress messages.
|
||||
*/
|
||||
if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
|
||||
{
|
||||
mysink->last_progress_report_time = now;
|
||||
|
||||
pq_beginmessage(&buf, 'd'); /* CopyData */
|
||||
pq_sendbyte(&buf, 'p'); /* Progress report */
|
||||
pq_sendint64(&buf, state->bytes_done);
|
||||
pq_endmessage(&buf);
|
||||
pq_flush_if_writable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We don't need to explicitly signal the end of the archive; the client
|
||||
* will figure out that we've reached the end when we begin the next one,
|
||||
* or begin the manifest, or end the COPY stream. However, this seems like
|
||||
* a good time to force out a progress report. One reason for that is that
|
||||
* if this is the last archive, and we don't force a progress report now,
|
||||
* the client will never be told that we sent all the bytes.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_end_archive(bbsink *sink)
|
||||
{
|
||||
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
||||
bbsink_state *state = mysink->base.bbs_state;
|
||||
StringInfoData buf;
|
||||
|
||||
mysink->bytes_done_at_last_time_check = state->bytes_done;
|
||||
mysink->last_progress_report_time = GetCurrentTimestamp();
|
||||
pq_beginmessage(&buf, 'd'); /* CopyData */
|
||||
pq_sendbyte(&buf, 'p'); /* Progress report */
|
||||
pq_sendint64(&buf, state->bytes_done);
|
||||
pq_endmessage(&buf);
|
||||
pq_flush_if_writable();
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a CopyData message announcing the beginning of the backup manifest.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_begin_manifest(bbsink *sink)
|
||||
{
|
||||
StringInfoData buf;
|
||||
|
||||
pq_beginmessage(&buf, 'd'); /* CopyData */
|
||||
pq_sendbyte(&buf, 'm'); /* Manifest */
|
||||
pq_endmessage(&buf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Each chunk of manifest data is sent using a CopyData message.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_copystream *mysink = (bbsink_copystream *) sink;
|
||||
|
||||
if (mysink->send_to_client)
|
||||
{
|
||||
/* Add one because we're also sending a leading type byte. */
|
||||
pq_putmessage('d', mysink->msgbuffer, len + 1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We don't need an explicit terminator for the backup manifest.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_end_manifest(bbsink *sink)
|
||||
{
|
||||
/* Do nothing. */
|
||||
}
|
||||
|
||||
/*
|
||||
* Send end-of-backup wire protocol messages.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
|
||||
TimeLineID endtli)
|
||||
{
|
||||
SendCopyDone();
|
||||
SendXlogRecPtrResult(endptr, endtli);
|
||||
}
|
||||
|
||||
/*
|
||||
* Cleanup.
|
||||
*/
|
||||
static void
|
||||
bbsink_copystream_cleanup(bbsink *sink)
|
||||
{
|
||||
/* Nothing to do. */
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a CopyOutResponse message.
|
||||
*/
|
||||
static void
|
||||
SendCopyOutResponse(void)
|
||||
{
|
||||
StringInfoData buf;
|
||||
|
||||
pq_beginmessage(&buf, 'H');
|
||||
pq_sendbyte(&buf, 0); /* overall format */
|
||||
pq_sendint16(&buf, 0); /* natts */
|
||||
pq_endmessage(&buf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a CopyDone message.
|
||||
*/
|
||||
static void
|
||||
SendCopyDone(void)
|
||||
{
|
||||
pq_putemptymessage('c');
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a single resultset containing just a single
|
||||
* XLogRecPtr record (in text format)
|
||||
*/
|
||||
static void
|
||||
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
||||
{
|
||||
DestReceiver *dest;
|
||||
TupOutputState *tstate;
|
||||
TupleDesc tupdesc;
|
||||
Datum values[2];
|
||||
bool nulls[2] = {0};
|
||||
|
||||
dest = CreateDestReceiver(DestRemoteSimple);
|
||||
|
||||
tupdesc = CreateTemplateTupleDesc(2);
|
||||
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
|
||||
/*
|
||||
* int8 may seem like a surprising data type for this, but in theory int4
|
||||
* would not be wide enough for this, as TimeLineID is unsigned.
|
||||
*/
|
||||
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
|
||||
|
||||
/* send RowDescription */
|
||||
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
|
||||
|
||||
/* Data row */
|
||||
values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
|
||||
values[1] = Int64GetDatum(tli);
|
||||
do_tup_output(tstate, values, nulls);
|
||||
|
||||
end_tup_output(tstate);
|
||||
|
||||
/* Send a CommandComplete message */
|
||||
pq_puttextmessage('C', "SELECT");
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a result set via libpq describing the tablespace list.
|
||||
*/
|
||||
static void
|
||||
SendTablespaceList(List *tablespaces)
|
||||
{
|
||||
DestReceiver *dest;
|
||||
TupOutputState *tstate;
|
||||
TupleDesc tupdesc;
|
||||
ListCell *lc;
|
||||
|
||||
dest = CreateDestReceiver(DestRemoteSimple);
|
||||
|
||||
tupdesc = CreateTemplateTupleDesc(3);
|
||||
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
|
||||
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
|
||||
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
|
||||
|
||||
/* send RowDescription */
|
||||
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
|
||||
|
||||
/* Construct and send the directory information */
|
||||
foreach(lc, tablespaces)
|
||||
{
|
||||
tablespaceinfo *ti = lfirst(lc);
|
||||
Datum values[3];
|
||||
bool nulls[3] = {0};
|
||||
|
||||
/* Send one datarow message */
|
||||
if (ti->path == NULL)
|
||||
{
|
||||
nulls[0] = true;
|
||||
nulls[1] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
|
||||
values[1] = CStringGetTextDatum(ti->path);
|
||||
}
|
||||
if (ti->size >= 0)
|
||||
values[2] = Int64GetDatum(ti->size / 1024);
|
||||
else
|
||||
nulls[2] = true;
|
||||
|
||||
do_tup_output(tstate, values, nulls);
|
||||
}
|
||||
|
||||
end_tup_output(tstate);
|
||||
}
|
308
src/backend/backup/basebackup_gzip.c
Normal file
308
src/backend/backup/basebackup_gzip.c
Normal file
@@ -0,0 +1,308 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_gzip.c
|
||||
* Basebackup sink implementing gzip compression.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_gzip.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
#include "backup/basebackup_sink.h"
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
typedef struct bbsink_gzip
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* Compression level. */
|
||||
int compresslevel;
|
||||
|
||||
/* Compressed data stream. */
|
||||
z_stream zstream;
|
||||
|
||||
/* Number of bytes staged in output buffer. */
|
||||
size_t bytes_written;
|
||||
} bbsink_gzip;
|
||||
|
||||
static void bbsink_gzip_begin_backup(bbsink *sink);
|
||||
static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
|
||||
static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_gzip_end_archive(bbsink *sink);
|
||||
static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
|
||||
static void gzip_pfree(void *opaque, void *address);
|
||||
|
||||
static const bbsink_ops bbsink_gzip_ops = {
|
||||
.begin_backup = bbsink_gzip_begin_backup,
|
||||
.begin_archive = bbsink_gzip_begin_archive,
|
||||
.archive_contents = bbsink_gzip_archive_contents,
|
||||
.end_archive = bbsink_gzip_end_archive,
|
||||
.begin_manifest = bbsink_forward_begin_manifest,
|
||||
.manifest_contents = bbsink_gzip_manifest_contents,
|
||||
.end_manifest = bbsink_forward_end_manifest,
|
||||
.end_backup = bbsink_forward_end_backup,
|
||||
.cleanup = bbsink_forward_cleanup
|
||||
};
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Create a new basebackup sink that performs gzip compression.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
|
||||
{
|
||||
#ifndef HAVE_LIBZ
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("gzip compression is not supported by this build")));
|
||||
return NULL; /* keep compiler quiet */
|
||||
#else
|
||||
bbsink_gzip *sink;
|
||||
int compresslevel;
|
||||
|
||||
Assert(next != NULL);
|
||||
|
||||
if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
|
||||
compresslevel = Z_DEFAULT_COMPRESSION;
|
||||
else
|
||||
{
|
||||
compresslevel = compress->level;
|
||||
Assert(compresslevel >= 1 && compresslevel <= 9);
|
||||
}
|
||||
|
||||
sink = palloc0(sizeof(bbsink_gzip));
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
|
||||
sink->base.bbs_next = next;
|
||||
sink->compresslevel = compresslevel;
|
||||
|
||||
return &sink->base;
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
|
||||
/*
|
||||
* Begin backup.
|
||||
*/
|
||||
static void
|
||||
bbsink_gzip_begin_backup(bbsink *sink)
|
||||
{
|
||||
/*
|
||||
* We need our own buffer, because we're going to pass different data to
|
||||
* the next sink than what gets passed to us.
|
||||
*/
|
||||
sink->bbs_buffer = palloc(sink->bbs_buffer_length);
|
||||
|
||||
/*
|
||||
* Since deflate() doesn't require the output buffer to be of any
|
||||
* particular size, we can just make it the same size as the input buffer.
|
||||
*/
|
||||
bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
|
||||
sink->bbs_buffer_length);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare to compress the next archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
bbsink_gzip *mysink = (bbsink_gzip *) sink;
|
||||
char *gz_archive_name;
|
||||
z_stream *zs = &mysink->zstream;
|
||||
|
||||
/* Initialize compressor object. */
|
||||
memset(zs, 0, sizeof(z_stream));
|
||||
zs->zalloc = gzip_palloc;
|
||||
zs->zfree = gzip_pfree;
|
||||
zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
|
||||
zs->avail_out = sink->bbs_next->bbs_buffer_length;
|
||||
|
||||
/*
|
||||
* We need to use deflateInit2() rather than deflateInit() here so that we
|
||||
* can request a gzip header rather than a zlib header. Otherwise, we want
|
||||
* to supply the same values that would have been used by default if we
|
||||
* had just called deflateInit().
|
||||
*
|
||||
* Per the documentation for deflateInit2, the third argument must be
|
||||
* Z_DEFLATED; the fourth argument is the number of "window bits", by
|
||||
* default 15, but adding 16 gets you a gzip header rather than a zlib
|
||||
* header; the fifth argument controls memory usage, and 8 is the default;
|
||||
* and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
|
||||
*/
|
||||
if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
|
||||
Z_DEFAULT_STRATEGY) != Z_OK)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not initialize compression library"));
|
||||
|
||||
/*
|
||||
* Add ".gz" to the archive name. Note that the pg_basebackup -z produces
|
||||
* archives named ".tar.gz" rather than ".tgz", so we match that here.
|
||||
*/
|
||||
gz_archive_name = psprintf("%s.gz", archive_name);
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_begin_archive(sink->bbs_next, gz_archive_name);
|
||||
pfree(gz_archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compress the input data to the output buffer until we run out of input
|
||||
* data. Each time the output buffer fills up, invoke the archive_contents()
|
||||
* method for then next sink.
|
||||
*
|
||||
* Note that since we're compressing the input, it may very commonly happen
|
||||
* that we consume all the input data without filling the output buffer. In
|
||||
* that case, the compressed representation of the current input data won't
|
||||
* actually be sent to the next bbsink until a later call to this function,
|
||||
* or perhaps even not until bbsink_gzip_end_archive() is invoked.
|
||||
*/
|
||||
static void
|
||||
bbsink_gzip_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_gzip *mysink = (bbsink_gzip *) sink;
|
||||
z_stream *zs = &mysink->zstream;
|
||||
|
||||
/* Compress data from input buffer. */
|
||||
zs->next_in = (uint8 *) mysink->base.bbs_buffer;
|
||||
zs->avail_in = len;
|
||||
|
||||
while (zs->avail_in > 0)
|
||||
{
|
||||
int res;
|
||||
|
||||
/* Write output data into unused portion of output buffer. */
|
||||
Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
|
||||
zs->next_out = (uint8 *)
|
||||
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
|
||||
zs->avail_out =
|
||||
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
|
||||
|
||||
/*
|
||||
* Try to compress. Note that this will update zs->next_in and
|
||||
* zs->avail_in according to how much input data was consumed, and
|
||||
* zs->next_out and zs->avail_out according to how many output bytes
|
||||
* were produced.
|
||||
*
|
||||
* According to the zlib documentation, Z_STREAM_ERROR should only
|
||||
* occur if we've made a programming error, or if say there's been a
|
||||
* memory clobber; we use elog() rather than Assert() here out of an
|
||||
* abundance of caution.
|
||||
*/
|
||||
res = deflate(zs, Z_NO_FLUSH);
|
||||
if (res == Z_STREAM_ERROR)
|
||||
elog(ERROR, "could not compress data: %s", zs->msg);
|
||||
|
||||
/* Update our notion of how many bytes we've written. */
|
||||
mysink->bytes_written =
|
||||
mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
|
||||
|
||||
/*
|
||||
* If the output buffer is full, it's time for the next sink to
|
||||
* process the contents.
|
||||
*/
|
||||
if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
|
||||
{
|
||||
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
|
||||
mysink->bytes_written = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* There might be some data inside zlib's internal buffers; we need to get
|
||||
* that flushed out and forwarded to the successor sink as archive content.
|
||||
*
|
||||
* Then we can end processing for this archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_gzip_end_archive(bbsink *sink)
|
||||
{
|
||||
bbsink_gzip *mysink = (bbsink_gzip *) sink;
|
||||
z_stream *zs = &mysink->zstream;
|
||||
|
||||
/* There is no more data available. */
|
||||
zs->next_in = (uint8 *) mysink->base.bbs_buffer;
|
||||
zs->avail_in = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
int res;
|
||||
|
||||
/* Write output data into unused portion of output buffer. */
|
||||
Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
|
||||
zs->next_out = (uint8 *)
|
||||
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
|
||||
zs->avail_out =
|
||||
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
|
||||
|
||||
/*
|
||||
* As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
|
||||
* no more input.
|
||||
*/
|
||||
res = deflate(zs, Z_FINISH);
|
||||
if (res == Z_STREAM_ERROR)
|
||||
elog(ERROR, "could not compress data: %s", zs->msg);
|
||||
|
||||
/* Update our notion of how many bytes we've written. */
|
||||
mysink->bytes_written =
|
||||
mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
|
||||
|
||||
/*
|
||||
* Apparently we had no data in the output buffer and deflate() was
|
||||
* not able to add any. We must be done.
|
||||
*/
|
||||
if (mysink->bytes_written == 0)
|
||||
break;
|
||||
|
||||
/* Send whatever accumulated output bytes we have. */
|
||||
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
|
||||
mysink->bytes_written = 0;
|
||||
}
|
||||
|
||||
/* Must also pass on the information that this archive has ended. */
|
||||
bbsink_forward_end_archive(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* Manifest contents are not compressed, but we do need to copy them into
|
||||
* the successor sink's buffer, because we have our own.
|
||||
*/
|
||||
static void
|
||||
bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
|
||||
bbsink_manifest_contents(sink->bbs_next, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wrapper function to adjust the signature of palloc to match what libz
|
||||
* expects.
|
||||
*/
|
||||
static void *
|
||||
gzip_palloc(void *opaque, unsigned items, unsigned size)
|
||||
{
|
||||
return palloc(items * size);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wrapper function to adjust the signature of pfree to match what libz
|
||||
* expects.
|
||||
*/
|
||||
static void
|
||||
gzip_pfree(void *opaque, void *address)
|
||||
{
|
||||
pfree(address);
|
||||
}
|
||||
|
||||
#endif
|
301
src/backend/backup/basebackup_lz4.c
Normal file
301
src/backend/backup/basebackup_lz4.c
Normal file
@@ -0,0 +1,301 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_lz4.c
|
||||
* Basebackup sink implementing lz4 compression.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_lz4.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef USE_LZ4
|
||||
#include <lz4frame.h>
|
||||
#endif
|
||||
|
||||
#include "backup/basebackup_sink.h"
|
||||
|
||||
#ifdef USE_LZ4
|
||||
|
||||
typedef struct bbsink_lz4
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* Compression level. */
|
||||
int compresslevel;
|
||||
|
||||
LZ4F_compressionContext_t ctx;
|
||||
LZ4F_preferences_t prefs;
|
||||
|
||||
/* Number of bytes staged in output buffer. */
|
||||
size_t bytes_written;
|
||||
} bbsink_lz4;
|
||||
|
||||
static void bbsink_lz4_begin_backup(bbsink *sink);
|
||||
static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
|
||||
static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
|
||||
static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_lz4_end_archive(bbsink *sink);
|
||||
static void bbsink_lz4_cleanup(bbsink *sink);
|
||||
|
||||
static const bbsink_ops bbsink_lz4_ops = {
|
||||
.begin_backup = bbsink_lz4_begin_backup,
|
||||
.begin_archive = bbsink_lz4_begin_archive,
|
||||
.archive_contents = bbsink_lz4_archive_contents,
|
||||
.end_archive = bbsink_lz4_end_archive,
|
||||
.begin_manifest = bbsink_forward_begin_manifest,
|
||||
.manifest_contents = bbsink_lz4_manifest_contents,
|
||||
.end_manifest = bbsink_forward_end_manifest,
|
||||
.end_backup = bbsink_forward_end_backup,
|
||||
.cleanup = bbsink_lz4_cleanup
|
||||
};
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Create a new basebackup sink that performs lz4 compression.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
|
||||
{
|
||||
#ifndef USE_LZ4
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("lz4 compression is not supported by this build")));
|
||||
return NULL; /* keep compiler quiet */
|
||||
#else
|
||||
bbsink_lz4 *sink;
|
||||
int compresslevel;
|
||||
|
||||
Assert(next != NULL);
|
||||
|
||||
if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
|
||||
compresslevel = 0;
|
||||
else
|
||||
{
|
||||
compresslevel = compress->level;
|
||||
Assert(compresslevel >= 1 && compresslevel <= 12);
|
||||
}
|
||||
|
||||
sink = palloc0(sizeof(bbsink_lz4));
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
|
||||
sink->base.bbs_next = next;
|
||||
sink->compresslevel = compresslevel;
|
||||
|
||||
return &sink->base;
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef USE_LZ4
|
||||
|
||||
/*
|
||||
* Begin backup.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_begin_backup(bbsink *sink)
|
||||
{
|
||||
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
|
||||
size_t output_buffer_bound;
|
||||
LZ4F_preferences_t *prefs = &mysink->prefs;
|
||||
|
||||
/* Initialize compressor object. */
|
||||
memset(prefs, 0, sizeof(LZ4F_preferences_t));
|
||||
prefs->frameInfo.blockSizeID = LZ4F_max256KB;
|
||||
prefs->compressionLevel = mysink->compresslevel;
|
||||
|
||||
/*
|
||||
* We need our own buffer, because we're going to pass different data to
|
||||
* the next sink than what gets passed to us.
|
||||
*/
|
||||
mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
|
||||
|
||||
/*
|
||||
* Since LZ4F_compressUpdate() requires the output buffer of size equal or
|
||||
* greater than that of LZ4F_compressBound(), make sure we have the next
|
||||
* sink's bbs_buffer of length that can accommodate the compressed input
|
||||
* buffer.
|
||||
*/
|
||||
output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
|
||||
&mysink->prefs);
|
||||
|
||||
/*
|
||||
* The buffer length is expected to be a multiple of BLCKSZ, so round up.
|
||||
*/
|
||||
output_buffer_bound = output_buffer_bound + BLCKSZ -
|
||||
(output_buffer_bound % BLCKSZ);
|
||||
|
||||
bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare to compress the next archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
|
||||
char *lz4_archive_name;
|
||||
LZ4F_errorCode_t ctxError;
|
||||
size_t headerSize;
|
||||
|
||||
ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(ctxError))
|
||||
elog(ERROR, "could not create lz4 compression context: %s",
|
||||
LZ4F_getErrorName(ctxError));
|
||||
|
||||
/* First of all write the frame header to destination buffer. */
|
||||
headerSize = LZ4F_compressBegin(mysink->ctx,
|
||||
mysink->base.bbs_next->bbs_buffer,
|
||||
mysink->base.bbs_next->bbs_buffer_length,
|
||||
&mysink->prefs);
|
||||
|
||||
if (LZ4F_isError(headerSize))
|
||||
elog(ERROR, "could not write lz4 header: %s",
|
||||
LZ4F_getErrorName(headerSize));
|
||||
|
||||
/*
|
||||
* We need to write the compressed data after the header in the output
|
||||
* buffer. So, make sure to update the notion of bytes written to output
|
||||
* buffer.
|
||||
*/
|
||||
mysink->bytes_written += headerSize;
|
||||
|
||||
/* Add ".lz4" to the archive name. */
|
||||
lz4_archive_name = psprintf("%s.lz4", archive_name);
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
|
||||
pfree(lz4_archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compress the input data to the output buffer until we run out of input
|
||||
* data. Each time the output buffer falls below the compression bound for
|
||||
* the input buffer, invoke the archive_contents() method for then next sink.
|
||||
*
|
||||
* Note that since we're compressing the input, it may very commonly happen
|
||||
* that we consume all the input data without filling the output buffer. In
|
||||
* that case, the compressed representation of the current input data won't
|
||||
* actually be sent to the next bbsink until a later call to this function,
|
||||
* or perhaps even not until bbsink_lz4_end_archive() is invoked.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
|
||||
{
|
||||
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
|
||||
size_t compressedSize;
|
||||
size_t avail_in_bound;
|
||||
|
||||
avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
|
||||
|
||||
/*
|
||||
* If the number of available bytes has fallen below the value computed by
|
||||
* LZ4F_compressBound(), ask the next sink to process the data so that we
|
||||
* can empty the buffer.
|
||||
*/
|
||||
if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
|
||||
avail_in_bound)
|
||||
{
|
||||
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
|
||||
mysink->bytes_written = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Compress the input buffer and write it into the output buffer.
|
||||
*/
|
||||
compressedSize = LZ4F_compressUpdate(mysink->ctx,
|
||||
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
|
||||
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
|
||||
(uint8 *) mysink->base.bbs_buffer,
|
||||
avail_in,
|
||||
NULL);
|
||||
|
||||
if (LZ4F_isError(compressedSize))
|
||||
elog(ERROR, "could not compress data: %s",
|
||||
LZ4F_getErrorName(compressedSize));
|
||||
|
||||
/*
|
||||
* Update our notion of how many bytes we've written into output buffer.
|
||||
*/
|
||||
mysink->bytes_written += compressedSize;
|
||||
}
|
||||
|
||||
/*
|
||||
* There might be some data inside lz4's internal buffers; we need to get
|
||||
* that flushed out and also finalize the lz4 frame and then get that forwarded
|
||||
* to the successor sink as archive content.
|
||||
*
|
||||
* Then we can end processing for this archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_end_archive(bbsink *sink)
|
||||
{
|
||||
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
|
||||
size_t compressedSize;
|
||||
size_t lz4_footer_bound;
|
||||
|
||||
lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
|
||||
|
||||
Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
|
||||
|
||||
if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
|
||||
lz4_footer_bound)
|
||||
{
|
||||
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
|
||||
mysink->bytes_written = 0;
|
||||
}
|
||||
|
||||
compressedSize = LZ4F_compressEnd(mysink->ctx,
|
||||
mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
|
||||
mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
|
||||
NULL);
|
||||
|
||||
if (LZ4F_isError(compressedSize))
|
||||
elog(ERROR, "could not end lz4 compression: %s",
|
||||
LZ4F_getErrorName(compressedSize));
|
||||
|
||||
/* Update our notion of how many bytes we've written. */
|
||||
mysink->bytes_written += compressedSize;
|
||||
|
||||
/* Send whatever accumulated output bytes we have. */
|
||||
bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
|
||||
mysink->bytes_written = 0;
|
||||
|
||||
/* Release the resources. */
|
||||
LZ4F_freeCompressionContext(mysink->ctx);
|
||||
mysink->ctx = NULL;
|
||||
|
||||
/* Pass on the information that this archive has ended. */
|
||||
bbsink_forward_end_archive(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* Manifest contents are not compressed, but we do need to copy them into
|
||||
* the successor sink's buffer, because we have our own.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
|
||||
bbsink_manifest_contents(sink->bbs_next, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* In case the backup fails, make sure we free the compression context by
|
||||
* calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
|
||||
*/
|
||||
static void
|
||||
bbsink_lz4_cleanup(bbsink *sink)
|
||||
{
|
||||
bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
|
||||
|
||||
if (mysink->ctx)
|
||||
{
|
||||
LZ4F_freeCompressionContext(mysink->ctx);
|
||||
mysink->ctx = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
246
src/backend/backup/basebackup_progress.c
Normal file
246
src/backend/backup/basebackup_progress.c
Normal file
@@ -0,0 +1,246 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_progress.c
|
||||
* Basebackup sink implementing progress tracking, including but not
|
||||
* limited to command progress reporting.
|
||||
*
|
||||
* This should be used even if the PROGRESS option to the replication
|
||||
* command BASE_BACKUP is not specified. Without that option, we won't
|
||||
* have tallied up the size of the files that are going to need to be
|
||||
* backed up, but we can still report to the command progress reporting
|
||||
* facility how much data we've processed.
|
||||
*
|
||||
* Moreover, we also use this as a convenient place to update certain
|
||||
* fields of the bbsink_state. That work is accurately described as
|
||||
* keeping track of our progress, but it's not just for introspection.
|
||||
* We need those fields to be updated properly in order for base backups
|
||||
* to work.
|
||||
*
|
||||
* This particular basebackup sink requires extra callbacks that most base
|
||||
* backup sinks don't. Rather than cramming those into the interface, we just
|
||||
* have a few extra functions here that basebackup.c can call. (We could put
|
||||
* the logic directly into that file as it's fairly simple, but it seems
|
||||
* cleaner to have everything related to progress reporting in one place.)
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_progress.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "backup/basebackup.h"
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "commands/progress.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
static void bbsink_progress_begin_backup(bbsink *sink);
|
||||
static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_progress_end_archive(bbsink *sink);
|
||||
|
||||
static const bbsink_ops bbsink_progress_ops = {
|
||||
.begin_backup = bbsink_progress_begin_backup,
|
||||
.begin_archive = bbsink_forward_begin_archive,
|
||||
.archive_contents = bbsink_progress_archive_contents,
|
||||
.end_archive = bbsink_progress_end_archive,
|
||||
.begin_manifest = bbsink_forward_begin_manifest,
|
||||
.manifest_contents = bbsink_forward_manifest_contents,
|
||||
.end_manifest = bbsink_forward_end_manifest,
|
||||
.end_backup = bbsink_forward_end_backup,
|
||||
.cleanup = bbsink_forward_cleanup
|
||||
};
|
||||
|
||||
/*
|
||||
* Create a new basebackup sink that performs progress tracking functions and
|
||||
* forwards data to a successor sink.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_progress_new(bbsink *next, bool estimate_backup_size)
|
||||
{
|
||||
bbsink *sink;
|
||||
|
||||
Assert(next != NULL);
|
||||
|
||||
sink = palloc0(sizeof(bbsink));
|
||||
*((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
|
||||
sink->bbs_next = next;
|
||||
|
||||
/*
|
||||
* Report that a base backup is in progress, and set the total size of the
|
||||
* backup to -1, which will get translated to NULL. If we're estimating
|
||||
* the backup size, we'll insert the real estimate when we have it.
|
||||
*/
|
||||
pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
|
||||
pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
|
||||
|
||||
return sink;
|
||||
}
|
||||
|
||||
/*
|
||||
* Progress reporting at start of backup.
|
||||
*/
|
||||
static void
|
||||
bbsink_progress_begin_backup(bbsink *sink)
|
||||
{
|
||||
const int index[] = {
|
||||
PROGRESS_BASEBACKUP_PHASE,
|
||||
PROGRESS_BASEBACKUP_BACKUP_TOTAL,
|
||||
PROGRESS_BASEBACKUP_TBLSPC_TOTAL
|
||||
};
|
||||
int64 val[3];
|
||||
|
||||
/*
|
||||
* Report that we are now streaming database files as a base backup. Also
|
||||
* advertise the number of tablespaces, and, if known, the estimated total
|
||||
* backup size.
|
||||
*/
|
||||
val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
|
||||
if (sink->bbs_state->bytes_total_is_valid)
|
||||
val[1] = sink->bbs_state->bytes_total;
|
||||
else
|
||||
val[1] = -1;
|
||||
val[2] = list_length(sink->bbs_state->tablespaces);
|
||||
pgstat_progress_update_multi_param(3, index, val);
|
||||
|
||||
/* Delegate to next sink. */
|
||||
bbsink_forward_begin_backup(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* End-of archive progress reporting.
|
||||
*/
|
||||
static void
|
||||
bbsink_progress_end_archive(bbsink *sink)
|
||||
{
|
||||
/*
|
||||
* We expect one archive per tablespace, so reaching the end of an archive
|
||||
* also means reaching the end of a tablespace. (Some day we might have a
|
||||
* reason to decouple these concepts.)
|
||||
*
|
||||
* If WAL is included in the backup, we'll mark the last tablespace
|
||||
* complete before the last archive is complete, so we need a guard here
|
||||
* to ensure that the number of tablespaces streamed doesn't exceed the
|
||||
* total.
|
||||
*/
|
||||
if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
|
||||
pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
|
||||
sink->bbs_state->tablespace_num + 1);
|
||||
|
||||
/* Delegate to next sink. */
|
||||
bbsink_forward_end_archive(sink);
|
||||
|
||||
/*
|
||||
* This is a convenient place to update the bbsink_state's notion of which
|
||||
* is the current tablespace. Note that the bbsink_state object is shared
|
||||
* across all bbsink objects involved, but we're the outermost one and
|
||||
* this is the very last thing we do.
|
||||
*/
|
||||
sink->bbs_state->tablespace_num++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle progress tracking for new archive contents.
|
||||
*
|
||||
* Increment the counter for the amount of data already streamed
|
||||
* by the given number of bytes, and update the progress report for
|
||||
* pg_stat_progress_basebackup.
|
||||
*/
|
||||
static void
|
||||
bbsink_progress_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_state *state = sink->bbs_state;
|
||||
const int index[] = {
|
||||
PROGRESS_BASEBACKUP_BACKUP_STREAMED,
|
||||
PROGRESS_BASEBACKUP_BACKUP_TOTAL
|
||||
};
|
||||
int64 val[2];
|
||||
int nparam = 0;
|
||||
|
||||
/* First update bbsink_state with # of bytes done. */
|
||||
state->bytes_done += len;
|
||||
|
||||
/* Now forward to next sink. */
|
||||
bbsink_forward_archive_contents(sink, len);
|
||||
|
||||
/* Prepare to set # of bytes done for command progress reporting. */
|
||||
val[nparam++] = state->bytes_done;
|
||||
|
||||
/*
|
||||
* We may also want to update # of total bytes, to avoid overflowing past
|
||||
* 100% or the full size. This may make the total size number change as we
|
||||
* approach the end of the backup (the estimate will always be wrong if
|
||||
* WAL is included), but that's better than having the done column be
|
||||
* bigger than the total.
|
||||
*/
|
||||
if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
|
||||
val[nparam++] = state->bytes_done;
|
||||
|
||||
pgstat_progress_update_multi_param(nparam, index, val);
|
||||
}
|
||||
|
||||
/*
|
||||
* Advertise that we are waiting for the start-of-backup checkpoint.
|
||||
*/
|
||||
void
|
||||
basebackup_progress_wait_checkpoint(void)
|
||||
{
|
||||
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
|
||||
PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
|
||||
}
|
||||
|
||||
/*
|
||||
* Advertise that we are estimating the backup size.
|
||||
*/
|
||||
void
|
||||
basebackup_progress_estimate_backup_size(void)
|
||||
{
|
||||
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
|
||||
PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
|
||||
}
|
||||
|
||||
/*
|
||||
* Advertise that we are waiting for WAL archiving at end-of-backup.
|
||||
*/
|
||||
void
|
||||
basebackup_progress_wait_wal_archive(bbsink_state *state)
|
||||
{
|
||||
const int index[] = {
|
||||
PROGRESS_BASEBACKUP_PHASE,
|
||||
PROGRESS_BASEBACKUP_TBLSPC_STREAMED
|
||||
};
|
||||
int64 val[2];
|
||||
|
||||
/*
|
||||
* We report having finished all tablespaces at this point, even if the
|
||||
* archive for the main tablespace is still open, because what's going to
|
||||
* be added is WAL files, not files that are really from the main
|
||||
* tablespace.
|
||||
*/
|
||||
val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
|
||||
val[1] = list_length(state->tablespaces);
|
||||
pgstat_progress_update_multi_param(2, index, val);
|
||||
}
|
||||
|
||||
/*
|
||||
* Advertise that we are transferring WAL files into the final archive.
|
||||
*/
|
||||
void
|
||||
basebackup_progress_transfer_wal(void)
|
||||
{
|
||||
pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
|
||||
PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Advertise that we are no longer performing a backup.
|
||||
*/
|
||||
void
|
||||
basebackup_progress_done(void)
|
||||
{
|
||||
pgstat_progress_end_command();
|
||||
}
|
309
src/backend/backup/basebackup_server.c
Normal file
309
src/backend/backup/basebackup_server.c
Normal file
@@ -0,0 +1,309 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_server.c
|
||||
* store basebackup archives on the server
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "backup/basebackup.h"
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/timestamp.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
typedef struct bbsink_server
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* Directory in which backup is to be stored. */
|
||||
char *pathname;
|
||||
|
||||
/* Currently open file (or 0 if nothing open). */
|
||||
File file;
|
||||
|
||||
/* Current file position. */
|
||||
off_t filepos;
|
||||
} bbsink_server;
|
||||
|
||||
static void bbsink_server_begin_archive(bbsink *sink,
|
||||
const char *archive_name);
|
||||
static void bbsink_server_archive_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_server_end_archive(bbsink *sink);
|
||||
static void bbsink_server_begin_manifest(bbsink *sink);
|
||||
static void bbsink_server_manifest_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_server_end_manifest(bbsink *sink);
|
||||
|
||||
static const bbsink_ops bbsink_server_ops = {
|
||||
.begin_backup = bbsink_forward_begin_backup,
|
||||
.begin_archive = bbsink_server_begin_archive,
|
||||
.archive_contents = bbsink_server_archive_contents,
|
||||
.end_archive = bbsink_server_end_archive,
|
||||
.begin_manifest = bbsink_server_begin_manifest,
|
||||
.manifest_contents = bbsink_server_manifest_contents,
|
||||
.end_manifest = bbsink_server_end_manifest,
|
||||
.end_backup = bbsink_forward_end_backup,
|
||||
.cleanup = bbsink_forward_cleanup
|
||||
};
|
||||
|
||||
/*
|
||||
* Create a new 'server' bbsink.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_server_new(bbsink *next, char *pathname)
|
||||
{
|
||||
bbsink_server *sink = palloc0(sizeof(bbsink_server));
|
||||
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_server_ops;
|
||||
sink->pathname = pathname;
|
||||
sink->base.bbs_next = next;
|
||||
|
||||
/* Replication permission is not sufficient in this case. */
|
||||
StartTransactionCommand();
|
||||
if (!has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser or a role with privileges of the pg_write_server_files role to create server backup")));
|
||||
CommitTransactionCommand();
|
||||
|
||||
/*
|
||||
* It's not a good idea to store your backups in the same directory that
|
||||
* you're backing up. If we allowed a relative path here, that could
|
||||
* easily happen accidentally, so we don't. The user could still
|
||||
* accomplish the same thing by including the absolute path to $PGDATA in
|
||||
* the pathname, but that's likely an intentional bad decision rather than
|
||||
* an accident.
|
||||
*/
|
||||
if (!is_absolute_path(pathname))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_NAME),
|
||||
errmsg("relative path not allowed for server backup")));
|
||||
|
||||
switch (pg_check_dir(pathname))
|
||||
{
|
||||
case 0:
|
||||
|
||||
/*
|
||||
* Does not exist, so create it using the same permissions we'd
|
||||
* use for a new subdirectory of the data directory itself.
|
||||
*/
|
||||
if (MakePGDirectory(pathname) < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not create directory \"%s\": %m", pathname)));
|
||||
break;
|
||||
|
||||
case 1:
|
||||
/* Exists, empty. */
|
||||
break;
|
||||
|
||||
case 2:
|
||||
case 3:
|
||||
case 4:
|
||||
/* Exists, not empty. */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DUPLICATE_FILE),
|
||||
errmsg("directory \"%s\" exists but is not empty",
|
||||
pathname)));
|
||||
break;
|
||||
|
||||
default:
|
||||
/* Access problem. */
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not access directory \"%s\": %m",
|
||||
pathname)));
|
||||
}
|
||||
|
||||
return &sink->base;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open the correct output file for this archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
char *filename;
|
||||
|
||||
Assert(mysink->file == 0);
|
||||
Assert(mysink->filepos == 0);
|
||||
|
||||
filename = psprintf("%s/%s", mysink->pathname, archive_name);
|
||||
|
||||
mysink->file = PathNameOpenFile(filename,
|
||||
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
|
||||
if (mysink->file <= 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not create file \"%s\": %m", filename)));
|
||||
|
||||
pfree(filename);
|
||||
|
||||
bbsink_forward_begin_archive(sink, archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Write the data to the output file.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
int nbytes;
|
||||
|
||||
nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
|
||||
mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
|
||||
|
||||
if (nbytes != len)
|
||||
{
|
||||
if (nbytes < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
FilePathName(mysink->file)),
|
||||
errhint("Check free disk space.")));
|
||||
/* short write: complain appropriately */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DISK_FULL),
|
||||
errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
|
||||
FilePathName(mysink->file),
|
||||
nbytes, (int) len, (unsigned) mysink->filepos),
|
||||
errhint("Check free disk space.")));
|
||||
}
|
||||
|
||||
mysink->filepos += nbytes;
|
||||
|
||||
bbsink_forward_archive_contents(sink, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* fsync and close the current output file.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_end_archive(bbsink *sink)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
|
||||
/*
|
||||
* We intentionally don't use data_sync_elevel here, because the server
|
||||
* shouldn't PANIC just because we can't guarantee that the backup has
|
||||
* been written down to disk. Running recovery won't fix anything in this
|
||||
* case anyway.
|
||||
*/
|
||||
if (FileSync(mysink->file, WAIT_EVENT_BASEBACKUP_SYNC) < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not fsync file \"%s\": %m",
|
||||
FilePathName(mysink->file))));
|
||||
|
||||
|
||||
/* We're done with this file now. */
|
||||
FileClose(mysink->file);
|
||||
mysink->file = 0;
|
||||
mysink->filepos = 0;
|
||||
|
||||
bbsink_forward_end_archive(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* Open the output file to which we will write the manifest.
|
||||
*
|
||||
* Just like pg_basebackup, we write the manifest first under a temporary
|
||||
* name and then rename it into place after fsync. That way, if the manifest
|
||||
* is there and under the correct name, the user can be sure that the backup
|
||||
* completed.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_begin_manifest(bbsink *sink)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
char *tmp_filename;
|
||||
|
||||
Assert(mysink->file == 0);
|
||||
|
||||
tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
|
||||
|
||||
mysink->file = PathNameOpenFile(tmp_filename,
|
||||
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
|
||||
if (mysink->file <= 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not create file \"%s\": %m", tmp_filename)));
|
||||
|
||||
pfree(tmp_filename);
|
||||
|
||||
bbsink_forward_begin_manifest(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* Each chunk of manifest data is sent using a CopyData message.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
int nbytes;
|
||||
|
||||
nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
|
||||
mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
|
||||
|
||||
if (nbytes != len)
|
||||
{
|
||||
if (nbytes < 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
FilePathName(mysink->file)),
|
||||
errhint("Check free disk space.")));
|
||||
/* short write: complain appropriately */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DISK_FULL),
|
||||
errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
|
||||
FilePathName(mysink->file),
|
||||
nbytes, (int) len, (unsigned) mysink->filepos),
|
||||
errhint("Check free disk space.")));
|
||||
}
|
||||
|
||||
mysink->filepos += nbytes;
|
||||
|
||||
bbsink_forward_manifest_contents(sink, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* fsync the backup manifest, close the file, and then rename it into place.
|
||||
*/
|
||||
static void
|
||||
bbsink_server_end_manifest(bbsink *sink)
|
||||
{
|
||||
bbsink_server *mysink = (bbsink_server *) sink;
|
||||
char *tmp_filename;
|
||||
char *filename;
|
||||
|
||||
/* We're done with this file now. */
|
||||
FileClose(mysink->file);
|
||||
mysink->file = 0;
|
||||
|
||||
/*
|
||||
* Rename it into place. This also fsyncs the temporary file, so we don't
|
||||
* need to do that here. We don't use data_sync_elevel here for the same
|
||||
* reasons as in bbsink_server_end_archive.
|
||||
*/
|
||||
tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
|
||||
filename = psprintf("%s/backup_manifest", mysink->pathname);
|
||||
durable_rename(tmp_filename, filename, ERROR);
|
||||
pfree(filename);
|
||||
pfree(tmp_filename);
|
||||
|
||||
bbsink_forward_end_manifest(sink);
|
||||
}
|
125
src/backend/backup/basebackup_sink.c
Normal file
125
src/backend/backup/basebackup_sink.c
Normal file
@@ -0,0 +1,125 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_sink.c
|
||||
* Default implementations for bbsink (basebackup sink) callbacks.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/backend/backup/basebackup_sink.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "backup/basebackup_sink.h"
|
||||
|
||||
/*
|
||||
* Forward begin_backup callback.
|
||||
*
|
||||
* Only use this implementation if you want the bbsink you're implementing to
|
||||
* share a buffer with the successor bbsink.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_begin_backup(bbsink *sink)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
Assert(sink->bbs_state != NULL);
|
||||
bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
|
||||
sink->bbs_buffer_length);
|
||||
sink->bbs_buffer = sink->bbs_next->bbs_buffer;
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward begin_archive callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_begin_archive(sink->bbs_next, archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward archive_contents callback.
|
||||
*
|
||||
* Code that wants to use this should initialize its own bbs_buffer and
|
||||
* bbs_buffer_length fields to the values from the successor sink. In cases
|
||||
* where the buffer isn't shared, the data needs to be copied before forwarding
|
||||
* the callback. We don't do try to do that here, because there's really no
|
||||
* reason to have separately allocated buffers containing the same identical
|
||||
* data.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
|
||||
Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
|
||||
bbsink_archive_contents(sink->bbs_next, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward end_archive callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_end_archive(bbsink *sink)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_end_archive(sink->bbs_next);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward begin_manifest callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_begin_manifest(bbsink *sink)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_begin_manifest(sink->bbs_next);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward manifest_contents callback.
|
||||
*
|
||||
* As with the archive_contents callback, it's expected that the buffer is
|
||||
* shared.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
|
||||
Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
|
||||
bbsink_manifest_contents(sink->bbs_next, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward end_manifest callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_end_manifest(bbsink *sink)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_end_manifest(sink->bbs_next);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward end_backup callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_end_backup(sink->bbs_next, endptr, endtli);
|
||||
}
|
||||
|
||||
/*
|
||||
* Forward cleanup callback.
|
||||
*/
|
||||
void
|
||||
bbsink_forward_cleanup(bbsink *sink)
|
||||
{
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_cleanup(sink->bbs_next);
|
||||
}
|
241
src/backend/backup/basebackup_target.c
Normal file
241
src/backend/backup/basebackup_target.c
Normal file
@@ -0,0 +1,241 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_target.c
|
||||
* Base backups can be "targeted", which means that they can be sent
|
||||
* somewhere other than to the client which requested the backup.
|
||||
* Furthermore, new targets can be defined by extensions. This file
|
||||
* contains code to support that functionality.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_target.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "backup/basebackup_target.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
typedef struct BaseBackupTargetType
|
||||
{
|
||||
char *name;
|
||||
void *(*check_detail) (char *, char *);
|
||||
bbsink *(*get_sink) (bbsink *, void *);
|
||||
} BaseBackupTargetType;
|
||||
|
||||
struct BaseBackupTargetHandle
|
||||
{
|
||||
BaseBackupTargetType *type;
|
||||
void *detail_arg;
|
||||
};
|
||||
|
||||
static void initialize_target_list(void);
|
||||
static bbsink *blackhole_get_sink(bbsink *next_sink, void *detail_arg);
|
||||
static bbsink *server_get_sink(bbsink *next_sink, void *detail_arg);
|
||||
static void *reject_target_detail(char *target, char *target_detail);
|
||||
static void *server_check_detail(char *target, char *target_detail);
|
||||
|
||||
static BaseBackupTargetType builtin_backup_targets[] =
|
||||
{
|
||||
{
|
||||
"blackhole", reject_target_detail, blackhole_get_sink
|
||||
},
|
||||
{
|
||||
"server", server_check_detail, server_get_sink
|
||||
},
|
||||
{
|
||||
NULL
|
||||
}
|
||||
};
|
||||
|
||||
static List *BaseBackupTargetTypeList = NIL;
|
||||
|
||||
/*
|
||||
* Add a new base backup target type.
|
||||
*
|
||||
* This is intended for use by server extensions.
|
||||
*/
|
||||
void
|
||||
BaseBackupAddTarget(char *name,
|
||||
void *(*check_detail) (char *, char *),
|
||||
bbsink *(*get_sink) (bbsink *, void *))
|
||||
{
|
||||
BaseBackupTargetType *ttype;
|
||||
MemoryContext oldcontext;
|
||||
ListCell *lc;
|
||||
|
||||
/* If the target list is not yet initialized, do that first. */
|
||||
if (BaseBackupTargetTypeList == NIL)
|
||||
initialize_target_list();
|
||||
|
||||
/* Search the target type list for an existing entry with this name. */
|
||||
foreach(lc, BaseBackupTargetTypeList)
|
||||
{
|
||||
BaseBackupTargetType *ttype = lfirst(lc);
|
||||
|
||||
if (strcmp(ttype->name, name) == 0)
|
||||
{
|
||||
/*
|
||||
* We found one, so update it.
|
||||
*
|
||||
* It is probably not a great idea to call BaseBackupAddTarget for
|
||||
* the same name multiple times, but if it happens, this seems
|
||||
* like the sanest behavior.
|
||||
*/
|
||||
ttype->check_detail = check_detail;
|
||||
ttype->get_sink = get_sink;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We use TopMemoryContext for allocations here to make sure that the data
|
||||
* we need doesn't vanish under us; that's also why we copy the target
|
||||
* name into a newly-allocated chunk of memory.
|
||||
*/
|
||||
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
|
||||
ttype = palloc(sizeof(BaseBackupTargetType));
|
||||
ttype->name = pstrdup(name);
|
||||
ttype->check_detail = check_detail;
|
||||
ttype->get_sink = get_sink;
|
||||
BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, ttype);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
/*
|
||||
* Look up a base backup target and validate the target_detail.
|
||||
*
|
||||
* Extensions that define new backup targets will probably define a new
|
||||
* type of bbsink to match. Validation of the target_detail can be performed
|
||||
* either in the check_detail routine called here, or in the bbsink
|
||||
* constructor, which will be called from BaseBackupGetSink. It's mostly
|
||||
* a matter of taste, but the check_detail function runs somewhat earlier.
|
||||
*/
|
||||
BaseBackupTargetHandle *
|
||||
BaseBackupGetTargetHandle(char *target, char *target_detail)
|
||||
{
|
||||
ListCell *lc;
|
||||
|
||||
/* If the target list is not yet initialized, do that first. */
|
||||
if (BaseBackupTargetTypeList == NIL)
|
||||
initialize_target_list();
|
||||
|
||||
/* Search the target type list for a match. */
|
||||
foreach(lc, BaseBackupTargetTypeList)
|
||||
{
|
||||
BaseBackupTargetType *ttype = lfirst(lc);
|
||||
|
||||
if (strcmp(ttype->name, target) == 0)
|
||||
{
|
||||
BaseBackupTargetHandle *handle;
|
||||
|
||||
/* Found the target. */
|
||||
handle = palloc(sizeof(BaseBackupTargetHandle));
|
||||
handle->type = ttype;
|
||||
handle->detail_arg = ttype->check_detail(target, target_detail);
|
||||
|
||||
return handle;
|
||||
}
|
||||
}
|
||||
|
||||
/* Did not find the target. */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unrecognized target: \"%s\"", target)));
|
||||
|
||||
/* keep compiler quiet */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct a bbsink that will implement the backup target.
|
||||
*
|
||||
* The get_sink function does all the real work, so all we have to do here
|
||||
* is call it with the correct arguments. Whatever the check_detail function
|
||||
* returned is here passed through to the get_sink function. This lets those
|
||||
* two functions communicate with each other, if they wish. If not, the
|
||||
* check_detail function can simply return the target_detail and let the
|
||||
* get_sink function take it from there.
|
||||
*/
|
||||
bbsink *
|
||||
BaseBackupGetSink(BaseBackupTargetHandle *handle, bbsink *next_sink)
|
||||
{
|
||||
return handle->type->get_sink(next_sink, handle->detail_arg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Load predefined target types into BaseBackupTargetTypeList.
|
||||
*/
|
||||
static void
|
||||
initialize_target_list(void)
|
||||
{
|
||||
BaseBackupTargetType *ttype = builtin_backup_targets;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
|
||||
while (ttype->name != NULL)
|
||||
{
|
||||
BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, ttype);
|
||||
++ttype;
|
||||
}
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
/*
|
||||
* Normally, a get_sink function should construct and return a new bbsink that
|
||||
* implements the backup target, but the 'blackhole' target just throws the
|
||||
* data away. We could implement that by adding a bbsink that does nothing
|
||||
* but forward, but it's even cheaper to implement that by not adding a bbsink
|
||||
* at all.
|
||||
*/
|
||||
static bbsink *
|
||||
blackhole_get_sink(bbsink *next_sink, void *detail_arg)
|
||||
{
|
||||
return next_sink;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a bbsink implementing a server-side backup.
|
||||
*/
|
||||
static bbsink *
|
||||
server_get_sink(bbsink *next_sink, void *detail_arg)
|
||||
{
|
||||
return bbsink_server_new(next_sink, detail_arg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Implement target-detail checking for a target that does not accept a
|
||||
* detail.
|
||||
*/
|
||||
static void *
|
||||
reject_target_detail(char *target, char *target_detail)
|
||||
{
|
||||
if (target_detail != NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("target '%s' does not accept a target detail",
|
||||
target)));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Implement target-detail checking for a server-side backup.
|
||||
*
|
||||
* target_detail should be the name of the directory to which the backup
|
||||
* should be written, but we don't check that here. Rather, that check,
|
||||
* as well as the necessary permissions checking, happens in bbsink_server_new.
|
||||
*/
|
||||
static void *
|
||||
server_check_detail(char *target, char *target_detail)
|
||||
{
|
||||
if (target_detail == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("target '%s' requires a target detail",
|
||||
target)));
|
||||
|
||||
return target_detail;
|
||||
}
|
199
src/backend/backup/basebackup_throttle.c
Normal file
199
src/backend/backup/basebackup_throttle.c
Normal file
@@ -0,0 +1,199 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_throttle.c
|
||||
* Basebackup sink implementing throttling. Data is forwarded to the
|
||||
* next base backup sink in the chain at a rate no greater than the
|
||||
* configured maximum.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_throttle.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "backup/basebackup_sink.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
typedef struct bbsink_throttle
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* The actual number of bytes, transfer of which may cause sleep. */
|
||||
uint64 throttling_sample;
|
||||
|
||||
/* Amount of data already transferred but not yet throttled. */
|
||||
int64 throttling_counter;
|
||||
|
||||
/* The minimum time required to transfer throttling_sample bytes. */
|
||||
TimeOffset elapsed_min_unit;
|
||||
|
||||
/* The last check of the transfer rate. */
|
||||
TimestampTz throttled_last;
|
||||
} bbsink_throttle;
|
||||
|
||||
static void bbsink_throttle_begin_backup(bbsink *sink);
|
||||
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
|
||||
static void throttle(bbsink_throttle *sink, size_t increment);
|
||||
|
||||
static const bbsink_ops bbsink_throttle_ops = {
|
||||
.begin_backup = bbsink_throttle_begin_backup,
|
||||
.begin_archive = bbsink_forward_begin_archive,
|
||||
.archive_contents = bbsink_throttle_archive_contents,
|
||||
.end_archive = bbsink_forward_end_archive,
|
||||
.begin_manifest = bbsink_forward_begin_manifest,
|
||||
.manifest_contents = bbsink_throttle_manifest_contents,
|
||||
.end_manifest = bbsink_forward_end_manifest,
|
||||
.end_backup = bbsink_forward_end_backup,
|
||||
.cleanup = bbsink_forward_cleanup
|
||||
};
|
||||
|
||||
/*
|
||||
* How frequently to throttle, as a fraction of the specified rate-second.
|
||||
*/
|
||||
#define THROTTLING_FREQUENCY 8
|
||||
|
||||
/*
|
||||
* Create a new basebackup sink that performs throttling and forwards data
|
||||
* to a successor sink.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_throttle_new(bbsink *next, uint32 maxrate)
|
||||
{
|
||||
bbsink_throttle *sink;
|
||||
|
||||
Assert(next != NULL);
|
||||
Assert(maxrate > 0);
|
||||
|
||||
sink = palloc0(sizeof(bbsink_throttle));
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
|
||||
sink->base.bbs_next = next;
|
||||
|
||||
sink->throttling_sample =
|
||||
(int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
|
||||
|
||||
/*
|
||||
* The minimum amount of time for throttling_sample bytes to be
|
||||
* transferred.
|
||||
*/
|
||||
sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
|
||||
|
||||
return &sink->base;
|
||||
}
|
||||
|
||||
/*
|
||||
* There's no real work to do here, but we need to record the current time so
|
||||
* that it can be used for future calculations.
|
||||
*/
|
||||
static void
|
||||
bbsink_throttle_begin_backup(bbsink *sink)
|
||||
{
|
||||
bbsink_throttle *mysink = (bbsink_throttle *) sink;
|
||||
|
||||
bbsink_forward_begin_backup(sink);
|
||||
|
||||
/* The 'real data' starts now (header was ignored). */
|
||||
mysink->throttled_last = GetCurrentTimestamp();
|
||||
}
|
||||
|
||||
/*
|
||||
* First throttle, and then pass archive contents to next sink.
|
||||
*/
|
||||
static void
|
||||
bbsink_throttle_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
throttle((bbsink_throttle *) sink, len);
|
||||
|
||||
bbsink_forward_archive_contents(sink, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* First throttle, and then pass manifest contents to next sink.
|
||||
*/
|
||||
static void
|
||||
bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
throttle((bbsink_throttle *) sink, len);
|
||||
|
||||
bbsink_forward_manifest_contents(sink, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* Increment the network transfer counter by the given number of bytes,
|
||||
* and sleep if necessary to comply with the requested network transfer
|
||||
* rate.
|
||||
*/
|
||||
static void
|
||||
throttle(bbsink_throttle *sink, size_t increment)
|
||||
{
|
||||
TimeOffset elapsed_min;
|
||||
|
||||
Assert(sink->throttling_counter >= 0);
|
||||
|
||||
sink->throttling_counter += increment;
|
||||
if (sink->throttling_counter < sink->throttling_sample)
|
||||
return;
|
||||
|
||||
/* How much time should have elapsed at minimum? */
|
||||
elapsed_min = sink->elapsed_min_unit *
|
||||
(sink->throttling_counter / sink->throttling_sample);
|
||||
|
||||
/*
|
||||
* Since the latch could be set repeatedly because of concurrently WAL
|
||||
* activity, sleep in a loop to ensure enough time has passed.
|
||||
*/
|
||||
for (;;)
|
||||
{
|
||||
TimeOffset elapsed,
|
||||
sleep;
|
||||
int wait_result;
|
||||
|
||||
/* Time elapsed since the last measurement (and possible wake up). */
|
||||
elapsed = GetCurrentTimestamp() - sink->throttled_last;
|
||||
|
||||
/* sleep if the transfer is faster than it should be */
|
||||
sleep = elapsed_min - elapsed;
|
||||
if (sleep <= 0)
|
||||
break;
|
||||
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* We're eating a potentially set latch, so check for interrupts */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
|
||||
* the maximum time to sleep. Thus the cast to long is safe.
|
||||
*/
|
||||
wait_result = WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
||||
(long) (sleep / 1000),
|
||||
WAIT_EVENT_BASE_BACKUP_THROTTLE);
|
||||
|
||||
if (wait_result & WL_LATCH_SET)
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Done waiting? */
|
||||
if (wait_result & WL_TIMEOUT)
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* As we work with integers, only whole multiple of throttling_sample was
|
||||
* processed. The rest will be done during the next call of this function.
|
||||
*/
|
||||
sink->throttling_counter %= sink->throttling_sample;
|
||||
|
||||
/*
|
||||
* Time interval for the remaining amount and possible next increments
|
||||
* starts now.
|
||||
*/
|
||||
sink->throttled_last = GetCurrentTimestamp();
|
||||
}
|
316
src/backend/backup/basebackup_zstd.c
Normal file
316
src/backend/backup/basebackup_zstd.c
Normal file
@@ -0,0 +1,316 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup_zstd.c
|
||||
* Basebackup sink implementing zstd compression.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/backup/basebackup_zstd.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef USE_ZSTD
|
||||
#include <zstd.h>
|
||||
#endif
|
||||
|
||||
#include "backup/basebackup_sink.h"
|
||||
|
||||
#ifdef USE_ZSTD
|
||||
|
||||
typedef struct bbsink_zstd
|
||||
{
|
||||
/* Common information for all types of sink. */
|
||||
bbsink base;
|
||||
|
||||
/* Compression options */
|
||||
pg_compress_specification *compress;
|
||||
|
||||
ZSTD_CCtx *cctx;
|
||||
ZSTD_outBuffer zstd_outBuf;
|
||||
} bbsink_zstd;
|
||||
|
||||
static void bbsink_zstd_begin_backup(bbsink *sink);
|
||||
static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
|
||||
static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
|
||||
static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
|
||||
static void bbsink_zstd_end_archive(bbsink *sink);
|
||||
static void bbsink_zstd_cleanup(bbsink *sink);
|
||||
static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
|
||||
TimeLineID endtli);
|
||||
|
||||
static const bbsink_ops bbsink_zstd_ops = {
|
||||
.begin_backup = bbsink_zstd_begin_backup,
|
||||
.begin_archive = bbsink_zstd_begin_archive,
|
||||
.archive_contents = bbsink_zstd_archive_contents,
|
||||
.end_archive = bbsink_zstd_end_archive,
|
||||
.begin_manifest = bbsink_forward_begin_manifest,
|
||||
.manifest_contents = bbsink_zstd_manifest_contents,
|
||||
.end_manifest = bbsink_forward_end_manifest,
|
||||
.end_backup = bbsink_zstd_end_backup,
|
||||
.cleanup = bbsink_zstd_cleanup
|
||||
};
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Create a new basebackup sink that performs zstd compression.
|
||||
*/
|
||||
bbsink *
|
||||
bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
|
||||
{
|
||||
#ifndef USE_ZSTD
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("zstd compression is not supported by this build")));
|
||||
return NULL; /* keep compiler quiet */
|
||||
#else
|
||||
bbsink_zstd *sink;
|
||||
|
||||
Assert(next != NULL);
|
||||
|
||||
sink = palloc0(sizeof(bbsink_zstd));
|
||||
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
|
||||
sink->base.bbs_next = next;
|
||||
sink->compress = compress;
|
||||
|
||||
return &sink->base;
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef USE_ZSTD
|
||||
|
||||
/*
|
||||
* Begin backup.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_begin_backup(bbsink *sink)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
size_t output_buffer_bound;
|
||||
size_t ret;
|
||||
pg_compress_specification *compress = mysink->compress;
|
||||
|
||||
mysink->cctx = ZSTD_createCCtx();
|
||||
if (!mysink->cctx)
|
||||
elog(ERROR, "could not create zstd compression context");
|
||||
|
||||
if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0)
|
||||
{
|
||||
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
|
||||
compress->level);
|
||||
if (ZSTD_isError(ret))
|
||||
elog(ERROR, "could not set zstd compression level to %d: %s",
|
||||
compress->level, ZSTD_getErrorName(ret));
|
||||
}
|
||||
|
||||
if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
|
||||
{
|
||||
/*
|
||||
* On older versions of libzstd, this option does not exist, and
|
||||
* trying to set it will fail. Similarly for newer versions if they
|
||||
* are compiled without threading support.
|
||||
*/
|
||||
ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
|
||||
compress->workers);
|
||||
if (ZSTD_isError(ret))
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("could not set compression worker count to %d: %s",
|
||||
compress->workers, ZSTD_getErrorName(ret)));
|
||||
}
|
||||
|
||||
/*
|
||||
* We need our own buffer, because we're going to pass different data to
|
||||
* the next sink than what gets passed to us.
|
||||
*/
|
||||
mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
|
||||
|
||||
/*
|
||||
* Make sure that the next sink's bbs_buffer is big enough to accommodate
|
||||
* the compressed input buffer.
|
||||
*/
|
||||
output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
|
||||
|
||||
/*
|
||||
* The buffer length is expected to be a multiple of BLCKSZ, so round up.
|
||||
*/
|
||||
output_buffer_bound = output_buffer_bound + BLCKSZ -
|
||||
(output_buffer_bound % BLCKSZ);
|
||||
|
||||
bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare to compress the next archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
char *zstd_archive_name;
|
||||
|
||||
/*
|
||||
* At the start of each archive we reset the state to start a new
|
||||
* compression operation. The parameters are sticky and they will stick
|
||||
* around as we are resetting with option ZSTD_reset_session_only.
|
||||
*/
|
||||
ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
|
||||
|
||||
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
|
||||
mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
|
||||
mysink->zstd_outBuf.pos = 0;
|
||||
|
||||
/* Add ".zst" to the archive name. */
|
||||
zstd_archive_name = psprintf("%s.zst", archive_name);
|
||||
Assert(sink->bbs_next != NULL);
|
||||
bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
|
||||
pfree(zstd_archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Compress the input data to the output buffer until we run out of input
|
||||
* data. Each time the output buffer falls below the compression bound for
|
||||
* the input buffer, invoke the archive_contents() method for the next sink.
|
||||
*
|
||||
* Note that since we're compressing the input, it may very commonly happen
|
||||
* that we consume all the input data without filling the output buffer. In
|
||||
* that case, the compressed representation of the current input data won't
|
||||
* actually be sent to the next bbsink until a later call to this function,
|
||||
* or perhaps even not until bbsink_zstd_end_archive() is invoked.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_archive_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
|
||||
|
||||
while (inBuf.pos < inBuf.size)
|
||||
{
|
||||
size_t yet_to_flush;
|
||||
size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
|
||||
|
||||
/*
|
||||
* If the out buffer is not left with enough space, send the output
|
||||
* buffer to the next sink, and reset it.
|
||||
*/
|
||||
if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
|
||||
{
|
||||
bbsink_archive_contents(mysink->base.bbs_next,
|
||||
mysink->zstd_outBuf.pos);
|
||||
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
|
||||
mysink->zstd_outBuf.size =
|
||||
mysink->base.bbs_next->bbs_buffer_length;
|
||||
mysink->zstd_outBuf.pos = 0;
|
||||
}
|
||||
|
||||
yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
|
||||
&inBuf, ZSTD_e_continue);
|
||||
|
||||
if (ZSTD_isError(yet_to_flush))
|
||||
elog(ERROR,
|
||||
"could not compress data: %s",
|
||||
ZSTD_getErrorName(yet_to_flush));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* There might be some data inside zstd's internal buffers; we need to get that
|
||||
* flushed out, also end the zstd frame and then get that forwarded to the
|
||||
* successor sink as archive content.
|
||||
*
|
||||
* Then we can end processing for this archive.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_end_archive(bbsink *sink)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
size_t yet_to_flush;
|
||||
|
||||
do
|
||||
{
|
||||
ZSTD_inBuffer in = {NULL, 0, 0};
|
||||
size_t max_needed = ZSTD_compressBound(0);
|
||||
|
||||
/*
|
||||
* If the out buffer is not left with enough space, send the output
|
||||
* buffer to the next sink, and reset it.
|
||||
*/
|
||||
if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
|
||||
{
|
||||
bbsink_archive_contents(mysink->base.bbs_next,
|
||||
mysink->zstd_outBuf.pos);
|
||||
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
|
||||
mysink->zstd_outBuf.size =
|
||||
mysink->base.bbs_next->bbs_buffer_length;
|
||||
mysink->zstd_outBuf.pos = 0;
|
||||
}
|
||||
|
||||
yet_to_flush = ZSTD_compressStream2(mysink->cctx,
|
||||
&mysink->zstd_outBuf,
|
||||
&in, ZSTD_e_end);
|
||||
|
||||
if (ZSTD_isError(yet_to_flush))
|
||||
elog(ERROR, "could not compress data: %s",
|
||||
ZSTD_getErrorName(yet_to_flush));
|
||||
|
||||
} while (yet_to_flush > 0);
|
||||
|
||||
/* Make sure to pass any remaining bytes to the next sink. */
|
||||
if (mysink->zstd_outBuf.pos > 0)
|
||||
bbsink_archive_contents(mysink->base.bbs_next,
|
||||
mysink->zstd_outBuf.pos);
|
||||
|
||||
/* Pass on the information that this archive has ended. */
|
||||
bbsink_forward_end_archive(sink);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free the resources and context.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
|
||||
TimeLineID endtli)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
|
||||
/* Release the context. */
|
||||
if (mysink->cctx)
|
||||
{
|
||||
ZSTD_freeCCtx(mysink->cctx);
|
||||
mysink->cctx = NULL;
|
||||
}
|
||||
|
||||
bbsink_forward_end_backup(sink, endptr, endtli);
|
||||
}
|
||||
|
||||
/*
|
||||
* Manifest contents are not compressed, but we do need to copy them into
|
||||
* the successor sink's buffer, because we have our own.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
|
||||
{
|
||||
memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
|
||||
bbsink_manifest_contents(sink->bbs_next, len);
|
||||
}
|
||||
|
||||
/*
|
||||
* In case the backup fails, make sure we free any compression context that
|
||||
* got allocated, so that we don't leak memory.
|
||||
*/
|
||||
static void
|
||||
bbsink_zstd_cleanup(bbsink *sink)
|
||||
{
|
||||
bbsink_zstd *mysink = (bbsink_zstd *) sink;
|
||||
|
||||
/* Release the context if not already released. */
|
||||
if (mysink->cctx)
|
||||
{
|
||||
ZSTD_freeCCtx(mysink->cctx);
|
||||
mysink->cctx = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
Reference in New Issue
Block a user