1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-19 13:42:17 +03:00

Move astreamer (except astreamer_inject) to fe_utils.

This allows the code to be used by other frontend applications.

Amul Sul, reviewed by Sravan Kumar, Andres Freund (whose input
I specifically solicited regarding the meson.build changes),
and me.

Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com
This commit is contained in:
Robert Haas
2024-08-05 11:40:29 -04:00
parent 53b2c921a0
commit f80b09bac8
12 changed files with 18 additions and 18 deletions

View File

@@ -21,6 +21,11 @@ override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = \
archive.o \
astreamer_file.o \
astreamer_gzip.o \
astreamer_lz4.o \
astreamer_tar.o \
astreamer_zstd.o \
cancel.o \
conditional.o \
connect_utils.o \

View File

@@ -0,0 +1,396 @@
/*-------------------------------------------------------------------------
*
* astreamer_file.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer_file.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#include "fe_utils/astreamer.h"
typedef struct astreamer_plain_writer
{
astreamer base;
char *pathname;
FILE *file;
bool should_close_file;
} astreamer_plain_writer;
typedef struct astreamer_extractor
{
astreamer base;
char *basepath;
const char *(*link_map) (const char *);
void (*report_output_file) (const char *);
char filename[MAXPGPATH];
FILE *file;
} astreamer_extractor;
static void astreamer_plain_writer_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_plain_writer_finalize(astreamer *streamer);
static void astreamer_plain_writer_free(astreamer *streamer);
static const astreamer_ops astreamer_plain_writer_ops = {
.content = astreamer_plain_writer_content,
.finalize = astreamer_plain_writer_finalize,
.free = astreamer_plain_writer_free
};
static void astreamer_extractor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_extractor_finalize(astreamer *streamer);
static void astreamer_extractor_free(astreamer *streamer);
static void extract_directory(const char *filename, mode_t mode);
static void extract_link(const char *filename, const char *linktarget);
static FILE *create_file_for_extract(const char *filename, mode_t mode);
static const astreamer_ops astreamer_extractor_ops = {
.content = astreamer_extractor_content,
.finalize = astreamer_extractor_finalize,
.free = astreamer_extractor_free
};
/*
* Create a astreamer that just writes data to a file.
*
* The caller must specify a pathname and may specify a file. The pathname is
* used for error-reporting purposes either way. If file is NULL, the pathname
* also identifies the file to which the data should be written: it is opened
* for writing and closed when done. If file is not NULL, the data is written
* there.
*/
astreamer *
astreamer_plain_writer_new(char *pathname, FILE *file)
{
astreamer_plain_writer *streamer;
streamer = palloc0(sizeof(astreamer_plain_writer));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_plain_writer_ops;
streamer->pathname = pstrdup(pathname);
streamer->file = file;
if (file == NULL)
{
streamer->file = fopen(pathname, "wb");
if (streamer->file == NULL)
pg_fatal("could not create file \"%s\": %m", pathname);
streamer->should_close_file = true;
}
return &streamer->base;
}
/*
* Write archive content to file.
*/
static void
astreamer_plain_writer_content(astreamer *streamer,
astreamer_member *member, const char *data,
int len, astreamer_archive_context context)
{
astreamer_plain_writer *mystreamer;
mystreamer = (astreamer_plain_writer *) streamer;
if (len == 0)
return;
errno = 0;
if (fwrite(data, len, 1, mystreamer->file) != 1)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to file \"%s\": %m",
mystreamer->pathname);
}
}
/*
* End-of-archive processing when writing to a plain file consists of closing
* the file if we opened it, but not if the caller provided it.
*/
static void
astreamer_plain_writer_finalize(astreamer *streamer)
{
astreamer_plain_writer *mystreamer;
mystreamer = (astreamer_plain_writer *) streamer;
if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
pg_fatal("could not close file \"%s\": %m",
mystreamer->pathname);
mystreamer->file = NULL;
mystreamer->should_close_file = false;
}
/*
* Free memory associated with this astreamer.
*/
static void
astreamer_plain_writer_free(astreamer *streamer)
{
astreamer_plain_writer *mystreamer;
mystreamer = (astreamer_plain_writer *) streamer;
Assert(!mystreamer->should_close_file);
Assert(mystreamer->base.bbs_next == NULL);
pfree(mystreamer->pathname);
pfree(mystreamer);
}
/*
* Create a astreamer that extracts an archive.
*
* All pathnames in the archive are interpreted relative to basepath.
*
* Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here
* with untyped chunks; we need typed chunks which follow the rules described
* in astreamer.h. Assuming we have that, we don't need to worry about the
* original archive format; it's enough to just look at the member information
* provided and write to the corresponding file.
*
* 'link_map' is a function that will be applied to the target of any
* symbolic link, and which should return a replacement pathname to be used
* in its place. If NULL, the symbolic link target is used without
* modification.
*
* 'report_output_file' is a function that will be called each time we open a
* new output file. The pathname to that file is passed as an argument. If
* NULL, the call is skipped.
*/
astreamer *
astreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *))
{
astreamer_extractor *streamer;
streamer = palloc0(sizeof(astreamer_extractor));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_extractor_ops;
streamer->basepath = pstrdup(basepath);
streamer->link_map = link_map;
streamer->report_output_file = report_output_file;
return &streamer->base;
}
/*
* Extract archive contents to the filesystem.
*/
static void
astreamer_extractor_content(astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
int fnamelen;
Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
Assert(context != ASTREAMER_UNKNOWN);
switch (context)
{
case ASTREAMER_MEMBER_HEADER:
Assert(mystreamer->file == NULL);
/* Prepend basepath. */
snprintf(mystreamer->filename, sizeof(mystreamer->filename),
"%s/%s", mystreamer->basepath, member->pathname);
/* Remove any trailing slash. */
fnamelen = strlen(mystreamer->filename);
if (mystreamer->filename[fnamelen - 1] == '/')
mystreamer->filename[fnamelen - 1] = '\0';
/* Dispatch based on file type. */
if (member->is_directory)
extract_directory(mystreamer->filename, member->mode);
else if (member->is_link)
{
const char *linktarget = member->linktarget;
if (mystreamer->link_map)
linktarget = mystreamer->link_map(linktarget);
extract_link(mystreamer->filename, linktarget);
}
else
mystreamer->file =
create_file_for_extract(mystreamer->filename,
member->mode);
/* Report output file change. */
if (mystreamer->report_output_file)
mystreamer->report_output_file(mystreamer->filename);
break;
case ASTREAMER_MEMBER_CONTENTS:
if (mystreamer->file == NULL)
break;
errno = 0;
if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to file \"%s\": %m",
mystreamer->filename);
}
break;
case ASTREAMER_MEMBER_TRAILER:
if (mystreamer->file == NULL)
break;
fclose(mystreamer->file);
mystreamer->file = NULL;
break;
case ASTREAMER_ARCHIVE_TRAILER:
break;
default:
/* Shouldn't happen. */
pg_fatal("unexpected state while extracting archive");
}
}
/*
* Should we tolerate an already-existing directory?
*
* When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been
* created by the wal receiver process. Also, when the WAL directory location
* was specified, pg_wal (or pg_xlog) has already been created as a symbolic
* link before starting the actual backup. So just ignore creation failures
* on related directories.
*
* If in-place tablespaces are used, pg_tblspc and subdirectories may already
* exist when we get here. So tolerate that case, too.
*/
static bool
should_allow_existing_directory(const char *pathname)
{
const char *filename = last_dir_separator(pathname) + 1;
if (strcmp(filename, "pg_wal") == 0 ||
strcmp(filename, "pg_xlog") == 0 ||
strcmp(filename, "archive_status") == 0 ||
strcmp(filename, "summaries") == 0 ||
strcmp(filename, "pg_tblspc") == 0)
return true;
if (strspn(filename, "0123456789") == strlen(filename))
{
const char *pg_tblspc = strstr(pathname, "/pg_tblspc/");
return pg_tblspc != NULL && pg_tblspc + 11 == filename;
}
return false;
}
/*
* Create a directory.
*/
static void
extract_directory(const char *filename, mode_t mode)
{
if (mkdir(filename, pg_dir_create_mode) != 0 &&
(errno != EEXIST || !should_allow_existing_directory(filename)))
pg_fatal("could not create directory \"%s\": %m",
filename);
#ifndef WIN32
if (chmod(filename, mode))
pg_fatal("could not set permissions on directory \"%s\": %m",
filename);
#endif
}
/*
* Create a symbolic link.
*
* It's most likely a link in pg_tblspc directory, to the location of a
* tablespace. Apply any tablespace mapping given on the command line
* (--tablespace-mapping). (We blindly apply the mapping without checking that
* the link really is inside pg_tblspc. We don't expect there to be other
* symlinks in a data directory, but if there are, you can call it an
* undocumented feature that you can map them too.)
*/
static void
extract_link(const char *filename, const char *linktarget)
{
if (symlink(linktarget, filename) != 0)
pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m",
filename, linktarget);
}
/*
* Create a regular file.
*
* Return the resulting handle so we can write the content to the file.
*/
static FILE *
create_file_for_extract(const char *filename, mode_t mode)
{
FILE *file;
file = fopen(filename, "wb");
if (file == NULL)
pg_fatal("could not create file \"%s\": %m", filename);
#ifndef WIN32
if (chmod(filename, mode))
pg_fatal("could not set permissions on file \"%s\": %m",
filename);
#endif
return file;
}
/*
* End-of-stream processing for extracting an archive.
*
* There's nothing to do here but sanity checking.
*/
static void
astreamer_extractor_finalize(astreamer *streamer)
{
astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
= (astreamer_extractor *) streamer;
Assert(mystreamer->file == NULL);
}
/*
* Free memory.
*/
static void
astreamer_extractor_free(astreamer *streamer)
{
astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
pfree(mystreamer->basepath);
pfree(mystreamer);
}

View File

@@ -0,0 +1,364 @@
/*-------------------------------------------------------------------------
*
* astreamer_gzip.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer_gzip.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#include "fe_utils/astreamer.h"
#ifdef HAVE_LIBZ
typedef struct astreamer_gzip_writer
{
astreamer base;
char *pathname;
gzFile gzfile;
} astreamer_gzip_writer;
typedef struct astreamer_gzip_decompressor
{
astreamer base;
z_stream zstream;
size_t bytes_written;
} astreamer_gzip_decompressor;
static void astreamer_gzip_writer_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_gzip_writer_finalize(astreamer *streamer);
static void astreamer_gzip_writer_free(astreamer *streamer);
static const char *get_gz_error(gzFile gzf);
static const astreamer_ops astreamer_gzip_writer_ops = {
.content = astreamer_gzip_writer_content,
.finalize = astreamer_gzip_writer_finalize,
.free = astreamer_gzip_writer_free
};
static void astreamer_gzip_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
static void astreamer_gzip_decompressor_free(astreamer *streamer);
static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
static void gzip_pfree(void *opaque, void *address);
static const astreamer_ops astreamer_gzip_decompressor_ops = {
.content = astreamer_gzip_decompressor_content,
.finalize = astreamer_gzip_decompressor_finalize,
.free = astreamer_gzip_decompressor_free
};
#endif
/*
* Create a astreamer that just compresses data using gzip, and then writes
* it to a file.
*
* As in the case of astreamer_plain_writer_new, pathname is always used
* for error reporting purposes; if file is NULL, it is also the opened and
* closed so that the data may be written there.
*/
astreamer *
astreamer_gzip_writer_new(char *pathname, FILE *file,
pg_compress_specification *compress)
{
#ifdef HAVE_LIBZ
astreamer_gzip_writer *streamer;
streamer = palloc0(sizeof(astreamer_gzip_writer));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_gzip_writer_ops;
streamer->pathname = pstrdup(pathname);
if (file == NULL)
{
streamer->gzfile = gzopen(pathname, "wb");
if (streamer->gzfile == NULL)
pg_fatal("could not create compressed file \"%s\": %m",
pathname);
}
else
{
int fd = dup(fileno(file));
if (fd < 0)
pg_fatal("could not duplicate stdout: %m");
streamer->gzfile = gzdopen(fd, "wb");
if (streamer->gzfile == NULL)
pg_fatal("could not open output file: %m");
}
if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
pg_fatal("could not set compression level %d: %s",
compress->level, get_gz_error(streamer->gzfile));
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "gzip");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef HAVE_LIBZ
/*
* Write archive content to gzip file.
*/
static void
astreamer_gzip_writer_content(astreamer *streamer,
astreamer_member *member, const char *data,
int len, astreamer_archive_context context)
{
astreamer_gzip_writer *mystreamer;
mystreamer = (astreamer_gzip_writer *) streamer;
if (len == 0)
return;
errno = 0;
if (gzwrite(mystreamer->gzfile, data, len) != len)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to compressed file \"%s\": %s",
mystreamer->pathname, get_gz_error(mystreamer->gzfile));
}
}
/*
* End-of-archive processing when writing to a gzip file consists of just
* calling gzclose.
*
* It makes no difference whether we opened the file or the caller did it,
* because libz provides no way of avoiding a close on the underlying file
* handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
* work around this issue, so that the behavior from the caller's viewpoint
* is the same as for astreamer_plain_writer.
*/
static void
astreamer_gzip_writer_finalize(astreamer *streamer)
{
astreamer_gzip_writer *mystreamer;
mystreamer = (astreamer_gzip_writer *) streamer;
errno = 0; /* in case gzclose() doesn't set it */
if (gzclose(mystreamer->gzfile) != 0)
pg_fatal("could not close compressed file \"%s\": %m",
mystreamer->pathname);
mystreamer->gzfile = NULL;
}
/*
* Free memory associated with this astreamer.
*/
static void
astreamer_gzip_writer_free(astreamer *streamer)
{
astreamer_gzip_writer *mystreamer;
mystreamer = (astreamer_gzip_writer *) streamer;
Assert(mystreamer->base.bbs_next == NULL);
Assert(mystreamer->gzfile == NULL);
pfree(mystreamer->pathname);
pfree(mystreamer);
}
/*
* Helper function for libz error reporting.
*/
static const char *
get_gz_error(gzFile gzf)
{
int errnum;
const char *errmsg;
errmsg = gzerror(gzf, &errnum);
if (errnum == Z_ERRNO)
return strerror(errno);
else
return errmsg;
}
#endif
/*
* Create a new base backup streamer that performs decompression of gzip
* compressed blocks.
*/
astreamer *
astreamer_gzip_decompressor_new(astreamer *next)
{
#ifdef HAVE_LIBZ
astreamer_gzip_decompressor *streamer;
z_stream *zs;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_gzip_decompressor));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_gzip_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
/* Initialize internal stream state for decompression */
zs = &streamer->zstream;
zs->zalloc = gzip_palloc;
zs->zfree = gzip_pfree;
zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
zs->avail_out = streamer->base.bbs_buffer.maxlen;
/*
* Data compression was initialized using deflateInit2 to request a gzip
* header. Similarly, we are using inflateInit2 to initialize data
* decompression.
*
* Per the documentation for inflateInit2, the second argument is
* "windowBits" and its value must be greater than or equal to the value
* provided while compressing the data, so we are using the maximum
* possible value for safety.
*/
if (inflateInit2(zs, 15 + 16) != Z_OK)
pg_fatal("could not initialize compression library");
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "gzip");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef HAVE_LIBZ
/*
* Decompress the input data to output buffer until we run out of input
* data. Each time the output buffer is full, pass on the decompressed data
* to the next streamer.
*/
static void
astreamer_gzip_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_gzip_decompressor *mystreamer;
z_stream *zs;
mystreamer = (astreamer_gzip_decompressor *) streamer;
zs = &mystreamer->zstream;
zs->next_in = (const uint8 *) data;
zs->avail_in = len;
/* Process the current chunk */
while (zs->avail_in > 0)
{
int res;
Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
zs->next_out = (uint8 *)
mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
zs->avail_out =
mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
/*
* This call decompresses data starting at zs->next_in and updates
* zs->next_in * and zs->avail_in. It generates output data starting
* at zs->next_out and updates zs->next_out and zs->avail_out
* accordingly.
*/
res = inflate(zs, Z_NO_FLUSH);
if (res == Z_STREAM_ERROR)
pg_log_error("could not decompress data: %s", zs->msg);
mystreamer->bytes_written =
mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
/* If output buffer is full then pass data to next streamer */
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen, context);
mystreamer->bytes_written = 0;
}
}
}
/*
* End-of-stream processing.
*/
static void
astreamer_gzip_decompressor_finalize(astreamer *streamer)
{
astreamer_gzip_decompressor *mystreamer;
mystreamer = (astreamer_gzip_decompressor *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_gzip_decompressor_free(astreamer *streamer)
{
astreamer_free(streamer->bbs_next);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
/*
* Wrapper function to adjust the signature of palloc to match what libz
* expects.
*/
static void *
gzip_palloc(void *opaque, unsigned items, unsigned size)
{
return palloc(items * size);
}
/*
* Wrapper function to adjust the signature of pfree to match what libz
* expects.
*/
static void
gzip_pfree(void *opaque, void *address)
{
pfree(address);
}
#endif

View File

@@ -0,0 +1,422 @@
/*-------------------------------------------------------------------------
*
* astreamer_lz4.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer_lz4.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#ifdef USE_LZ4
#include <lz4frame.h>
#endif
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#include "fe_utils/astreamer.h"
#ifdef USE_LZ4
typedef struct astreamer_lz4_frame
{
astreamer base;
LZ4F_compressionContext_t cctx;
LZ4F_decompressionContext_t dctx;
LZ4F_preferences_t prefs;
size_t bytes_written;
bool header_written;
} astreamer_lz4_frame;
static void astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_compressor_finalize(astreamer *streamer);
static void astreamer_lz4_compressor_free(astreamer *streamer);
static const astreamer_ops astreamer_lz4_compressor_ops = {
.content = astreamer_lz4_compressor_content,
.finalize = astreamer_lz4_compressor_finalize,
.free = astreamer_lz4_compressor_free
};
static void astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
static void astreamer_lz4_decompressor_free(astreamer *streamer);
static const astreamer_ops astreamer_lz4_decompressor_ops = {
.content = astreamer_lz4_decompressor_content,
.finalize = astreamer_lz4_decompressor_finalize,
.free = astreamer_lz4_decompressor_free
};
#endif
/*
* Create a new base backup streamer that performs lz4 compression of tar
* blocks.
*/
astreamer *
astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_LZ4
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
LZ4F_preferences_t *prefs;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
streamer->header_written = false;
/* Initialize stream compression preferences */
prefs = &streamer->prefs;
memset(prefs, 0, sizeof(LZ4F_preferences_t));
prefs->frameInfo.blockSizeID = LZ4F_max256KB;
prefs->compressionLevel = compress->level;
ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
if (LZ4F_isError(ctxError))
pg_log_error("could not create lz4 compression context: %s",
LZ4F_getErrorName(ctxError));
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "LZ4");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_LZ4
/*
* Compress the input data to output buffer.
*
* Find out the compression bound based on input data length for each
* invocation to make sure that output buffer has enough capacity to
* accommodate the compressed data. In case if the output buffer
* capacity falls short of compression bound then forward the content
* of output buffer to next streamer and empty the buffer.
*/
static void
astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t out_bound,
compressed_size,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
/* Write header before processing the first input chunk. */
if (!mystreamer->header_written)
{
compressed_size = LZ4F_compressBegin(mystreamer->cctx,
(uint8 *) mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
&mystreamer->prefs);
if (LZ4F_isError(compressed_size))
pg_log_error("could not write lz4 header: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
mystreamer->header_written = true;
}
/*
* Update the offset and capacity of output buffer based on number of
* bytes written to output buffer.
*/
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
/*
* Find out the compression bound and make sure that output buffer has the
* required capacity for the success of LZ4F_compressUpdate. If needed
* forward the content to next streamer and empty the buffer.
*/
out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
if (avail_out < out_bound)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
context);
/* Enlarge buffer if it falls short of out bound. */
if (mystreamer->base.bbs_buffer.maxlen < out_bound)
enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
/*
* This call compresses the data starting at next_in and generates the
* output starting at next_out. It expects the caller to provide the size
* of input buffer and capacity of output buffer by providing parameters
* len and avail_out.
*
* It returns the number of bytes compressed to output buffer.
*/
compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
next_out, avail_out,
next_in, len, NULL);
if (LZ4F_isError(compressed_size))
pg_log_error("could not compress data: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
}
/*
* End-of-stream processing.
*/
static void
astreamer_lz4_compressor_finalize(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_out;
size_t footer_bound,
compressed_size,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
/* Find out the footer bound and update the output buffer. */
footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
footer_bound)
{
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
/* Enlarge buffer if it falls short of footer bound. */
if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
else
{
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
}
/*
* Finalize the frame and flush whatever data remaining in compression
* context.
*/
compressed_size = LZ4F_compressEnd(mystreamer->cctx,
next_out, avail_out, NULL);
if (LZ4F_isError(compressed_size))
pg_log_error("could not end lz4 compression: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_lz4_compressor_free(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeCompressionContext(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif
/*
* Create a new base backup streamer that performs decompression of lz4
* compressed blocks.
*/
astreamer *
astreamer_lz4_decompressor_new(astreamer *next)
{
#ifdef USE_LZ4
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
/* Initialize internal stream state for decompression */
ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
if (LZ4F_isError(ctxError))
pg_fatal("could not initialize compression library: %s",
LZ4F_getErrorName(ctxError));
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "LZ4");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_LZ4
/*
* Decompress the input data to output buffer until we run out of input
* data. Each time the output buffer is full, pass on the decompressed data
* to the next streamer.
*/
static void
astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t avail_in,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
avail_in = len;
avail_out = mystreamer->base.bbs_buffer.maxlen;
while (avail_in > 0)
{
size_t ret,
read_size,
out_size;
read_size = avail_in;
out_size = avail_out;
/*
* This call decompresses the data starting at next_in and generates
* the output data starting at next_out. It expects the caller to
* provide size of the input buffer and total capacity of the output
* buffer by providing the read_size and out_size parameters
* respectively.
*
* Per the documentation of LZ4, parameters read_size and out_size
* behaves as dual parameters. On return, the number of bytes consumed
* from the input buffer will be written back to read_size and the
* number of bytes decompressed to output buffer will be written back
* to out_size respectively.
*/
ret = LZ4F_decompress(mystreamer->dctx,
next_out, &out_size,
next_in, &read_size, NULL);
if (LZ4F_isError(ret))
pg_log_error("could not decompress data: %s",
LZ4F_getErrorName(ret));
/* Update input buffer based on number of bytes consumed */
avail_in -= read_size;
next_in += read_size;
mystreamer->bytes_written += out_size;
/*
* If output buffer is full then forward the content to next streamer
* and update the output buffer.
*/
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
context);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
else
{
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
next_out += mystreamer->bytes_written;
}
}
}
/*
* End-of-stream processing.
*/
static void
astreamer_lz4_decompressor_finalize(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_lz4_decompressor_free(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeDecompressionContext(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif

View File

@@ -0,0 +1,514 @@
/*-------------------------------------------------------------------------
*
* astreamer_tar.c
*
* This module implements three types of tar processing. A tar parser
* expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits
* it into labelled chunks (any other value of astreamer_archive_context).
* A tar archiver does the reverse: it takes a bunch of labelled chunks
* and produces a tarfile, optionally replacing member headers and trailers
* so that upstream astreamer objects can perform surgery on the tarfile
* contents without knowing the details of the tar format. A tar terminator
* just adds two blocks of NUL bytes to the end of the file, since older
* server versions produce files with this terminator omitted.
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer_tar.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <time.h>
#include "common/logging.h"
#include "fe_utils/astreamer.h"
#include "pgtar.h"
typedef struct astreamer_tar_parser
{
astreamer base;
astreamer_archive_context next_context;
astreamer_member member;
size_t file_bytes_sent;
size_t pad_bytes_expected;
} astreamer_tar_parser;
typedef struct astreamer_tar_archiver
{
astreamer base;
bool rearchive_member;
} astreamer_tar_archiver;
static void astreamer_tar_parser_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_parser_finalize(astreamer *streamer);
static void astreamer_tar_parser_free(astreamer *streamer);
static bool astreamer_tar_header(astreamer_tar_parser *mystreamer);
static const astreamer_ops astreamer_tar_parser_ops = {
.content = astreamer_tar_parser_content,
.finalize = astreamer_tar_parser_finalize,
.free = astreamer_tar_parser_free
};
static void astreamer_tar_archiver_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_archiver_finalize(astreamer *streamer);
static void astreamer_tar_archiver_free(astreamer *streamer);
static const astreamer_ops astreamer_tar_archiver_ops = {
.content = astreamer_tar_archiver_content,
.finalize = astreamer_tar_archiver_finalize,
.free = astreamer_tar_archiver_free
};
static void astreamer_tar_terminator_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_terminator_finalize(astreamer *streamer);
static void astreamer_tar_terminator_free(astreamer *streamer);
static const astreamer_ops astreamer_tar_terminator_ops = {
.content = astreamer_tar_terminator_content,
.finalize = astreamer_tar_terminator_finalize,
.free = astreamer_tar_terminator_free
};
/*
* Create a astreamer that can parse a stream of content as tar data.
*
* The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer
* specified by 'next' will receive a series of typed chunks, as per the
* conventions described in astreamer.h.
*/
astreamer *
astreamer_tar_parser_new(astreamer *next)
{
astreamer_tar_parser *streamer;
streamer = palloc0(sizeof(astreamer_tar_parser));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_tar_parser_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
streamer->next_context = ASTREAMER_MEMBER_HEADER;
return &streamer->base;
}
/*
* Parse unknown content as tar data.
*/
static void
astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
size_t nbytes;
/* Expect unparsed input. */
Assert(member == NULL);
Assert(context == ASTREAMER_UNKNOWN);
while (len > 0)
{
switch (mystreamer->next_context)
{
case ASTREAMER_MEMBER_HEADER:
/*
* If we're expecting an archive member header, accumulate a
* full block of data before doing anything further.
*/
if (!astreamer_buffer_until(streamer, &data, &len,
TAR_BLOCK_SIZE))
return;
/*
* Now we can process the header and get ready to process the
* file contents; however, we might find out that what we
* thought was the next file header is actually the start of
* the archive trailer. Switch modes accordingly.
*/
if (astreamer_tar_header(mystreamer))
{
if (mystreamer->member.size == 0)
{
/* No content; trailer is zero-length. */
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Expect contents. */
mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS;
}
mystreamer->base.bbs_buffer.len = 0;
mystreamer->file_bytes_sent = 0;
}
else
mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER;
break;
case ASTREAMER_MEMBER_CONTENTS:
/*
* Send as much content as we have, but not more than the
* remaining file length.
*/
Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
nbytes = Min(nbytes, len);
Assert(nbytes > 0);
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, nbytes,
ASTREAMER_MEMBER_CONTENTS);
mystreamer->file_bytes_sent += nbytes;
data += nbytes;
len -= nbytes;
/*
* If we've not yet sent the whole file, then there's more
* content to come; otherwise, it's time to expect the file
* trailer.
*/
Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
if (mystreamer->file_bytes_sent == mystreamer->member.size)
{
if (mystreamer->pad_bytes_expected == 0)
{
/* Trailer is zero-length. */
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Trailer is not zero-length. */
mystreamer->next_context = ASTREAMER_MEMBER_TRAILER;
}
mystreamer->base.bbs_buffer.len = 0;
}
break;
case ASTREAMER_MEMBER_TRAILER:
/*
* If we're expecting an archive member trailer, accumulate
* the expected number of padding bytes before sending
* anything onward.
*/
if (!astreamer_buffer_until(streamer, &data, &len,
mystreamer->pad_bytes_expected))
return;
/* OK, now we can send it. */
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, mystreamer->pad_bytes_expected,
ASTREAMER_MEMBER_TRAILER);
/* Expect next file header. */
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
mystreamer->base.bbs_buffer.len = 0;
break;
case ASTREAMER_ARCHIVE_TRAILER:
/*
* We've seen an end-of-archive indicator, so anything more is
* buffered and sent as part of the archive trailer. But we
* don't expect more than 2 blocks.
*/
astreamer_buffer_bytes(streamer, &data, &len, len);
if (len > 2 * TAR_BLOCK_SIZE)
pg_fatal("tar file trailer exceeds 2 blocks");
return;
default:
/* Shouldn't happen. */
pg_fatal("unexpected state while parsing tar archive");
}
}
}
/*
* Parse a file header within a tar stream.
*
* The return value is true if we found a file header and passed it on to the
* next astreamer; it is false if we have reached the archive trailer.
*/
static bool
astreamer_tar_header(astreamer_tar_parser *mystreamer)
{
bool has_nonzero_byte = false;
int i;
astreamer_member *member = &mystreamer->member;
char *buffer = mystreamer->base.bbs_buffer.data;
Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
/* Check whether we've got a block of all zero bytes. */
for (i = 0; i < TAR_BLOCK_SIZE; ++i)
{
if (buffer[i] != '\0')
{
has_nonzero_byte = true;
break;
}
}
/*
* If the entire block was zeros, this is the end of the archive, not the
* start of the next file.
*/
if (!has_nonzero_byte)
return false;
/*
* Parse key fields out of the header.
*/
strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH);
if (member->pathname[0] == '\0')
pg_fatal("tar member has empty name");
member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12);
member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8);
member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8);
member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8);
member->is_directory =
(buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY);
member->is_link =
(buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK);
if (member->is_link)
strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100);
/* Compute number of padding bytes. */
mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
/* Forward the entire header to the next astreamer. */
astreamer_content(mystreamer->base.bbs_next, member,
buffer, TAR_BLOCK_SIZE,
ASTREAMER_MEMBER_HEADER);
return true;
}
/*
* End-of-stream processing for a tar parser.
*/
static void
astreamer_tar_parser_finalize(astreamer *streamer)
{
astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER &&
(mystreamer->next_context != ASTREAMER_MEMBER_HEADER ||
mystreamer->base.bbs_buffer.len > 0))
pg_fatal("COPY stream ended before last file was finished");
/* Send the archive trailer, even if empty. */
astreamer_content(streamer->bbs_next, NULL,
streamer->bbs_buffer.data, streamer->bbs_buffer.len,
ASTREAMER_ARCHIVE_TRAILER);
/* Now finalize successor. */
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar parser.
*/
static void
astreamer_tar_parser_free(astreamer *streamer)
{
pfree(streamer->bbs_buffer.data);
astreamer_free(streamer->bbs_next);
}
/*
* Create a astreamer that can generate a tar archive.
*
* This is intended to be usable either for generating a brand-new tar archive
* or for modifying one on the fly. The input should be a series of typed
* chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for
* astreamer_tar_parser_content.
*/
astreamer *
astreamer_tar_archiver_new(astreamer *next)
{
astreamer_tar_archiver *streamer;
streamer = palloc0(sizeof(astreamer_tar_archiver));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_tar_archiver_ops;
streamer->base.bbs_next = next;
return &streamer->base;
}
/*
* Fix up the stream of input chunks to create a valid tar file.
*
* If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
* newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
* passed through without change. Any other size is a fatal error (and
* indicates a bug).
*
* Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the
* corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from
* scratch. Specifically, we construct a block of zero bytes sufficient to
* pad out to a block boundary, as required by the tar format. Other
* ASTREAMER_MEMBER_TRAILER chunks are passed through without change.
*
* Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change.
*
* The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two
* blocks of zero bytes. Not all tar programs require this, but apparently
* some do. The server does not supply this trailer. If no archive trailer is
* present, one will be added by astreamer_tar_parser_finalize.
*/
static void
astreamer_tar_archiver_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer;
char buffer[2 * TAR_BLOCK_SIZE];
Assert(context != ASTREAMER_UNKNOWN);
if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
{
Assert(len == 0);
/* Replace zero-length tar header with a newly constructed one. */
tarCreateHeader(buffer, member->pathname, NULL,
member->size, member->mode, member->uid, member->gid,
time(NULL));
data = buffer;
len = TAR_BLOCK_SIZE;
/* Also make a note to replace padding, in case size changed. */
mystreamer->rearchive_member = true;
}
else if (context == ASTREAMER_MEMBER_TRAILER &&
mystreamer->rearchive_member)
{
int pad_bytes = tarPaddingBytesRequired(member->size);
/* Also replace padding, if we regenerated the header. */
memset(buffer, 0, pad_bytes);
data = buffer;
len = pad_bytes;
/* Don't do this again unless we replace another header. */
mystreamer->rearchive_member = false;
}
else if (context == ASTREAMER_ARCHIVE_TRAILER)
{
/* Trailer should always be two blocks of zero bytes. */
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
data = buffer;
len = 2 * TAR_BLOCK_SIZE;
}
astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
* End-of-stream processing for a tar archiver.
*/
static void
astreamer_tar_archiver_finalize(astreamer *streamer)
{
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar archiver.
*/
static void
astreamer_tar_archiver_free(astreamer *streamer)
{
astreamer_free(streamer->bbs_next);
pfree(streamer);
}
/*
* Create a astreamer that blindly adds two blocks of NUL bytes to the
* end of an incomplete tarfile that the server might send us.
*/
astreamer *
astreamer_tar_terminator_new(astreamer *next)
{
astreamer *streamer;
streamer = palloc0(sizeof(astreamer));
*((const astreamer_ops **) &streamer->bbs_ops) =
&astreamer_tar_terminator_ops;
streamer->bbs_next = next;
return streamer;
}
/*
* Pass all the content through without change.
*/
static void
astreamer_tar_terminator_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
/* Expect unparsed input. */
Assert(member == NULL);
Assert(context == ASTREAMER_UNKNOWN);
/* Just forward it. */
astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
* At the end, blindly add the two blocks of NUL bytes which the server fails
* to supply.
*/
static void
astreamer_tar_terminator_finalize(astreamer *streamer)
{
char buffer[2 * TAR_BLOCK_SIZE];
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
astreamer_content(streamer->bbs_next, NULL, buffer,
2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN);
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar terminator.
*/
static void
astreamer_tar_terminator_free(astreamer *streamer)
{
astreamer_free(streamer->bbs_next);
pfree(streamer);
}

View File

@@ -0,0 +1,368 @@
/*-------------------------------------------------------------------------
*
* astreamer_zstd.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer_zstd.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#ifdef USE_ZSTD
#include <zstd.h>
#endif
#include "common/logging.h"
#include "fe_utils/astreamer.h"
#ifdef USE_ZSTD
typedef struct astreamer_zstd_frame
{
astreamer base;
ZSTD_CCtx *cctx;
ZSTD_DCtx *dctx;
ZSTD_outBuffer zstd_outBuf;
} astreamer_zstd_frame;
static void astreamer_zstd_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_zstd_compressor_finalize(astreamer *streamer);
static void astreamer_zstd_compressor_free(astreamer *streamer);
static const astreamer_ops astreamer_zstd_compressor_ops = {
.content = astreamer_zstd_compressor_content,
.finalize = astreamer_zstd_compressor_finalize,
.free = astreamer_zstd_compressor_free
};
static void astreamer_zstd_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
static void astreamer_zstd_decompressor_free(astreamer *streamer);
static const astreamer_ops astreamer_zstd_decompressor_ops = {
.content = astreamer_zstd_decompressor_content,
.finalize = astreamer_zstd_decompressor_finalize,
.free = astreamer_zstd_decompressor_free
};
#endif
/*
* Create a new base backup streamer that performs zstd compression of tar
* blocks.
*/
astreamer *
astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_ZSTD
astreamer_zstd_frame *streamer;
size_t ret;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_zstd_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_zstd_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
streamer->cctx = ZSTD_createCCtx();
if (!streamer->cctx)
pg_fatal("could not create zstd compression context");
/* Set compression level */
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
compress->level);
if (ZSTD_isError(ret))
pg_fatal("could not set zstd compression level to %d: %s",
compress->level, ZSTD_getErrorName(ret));
/* Set # of workers, if specified */
if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
{
/*
* On older versions of libzstd, this option does not exist, and
* trying to set it will fail. Similarly for newer versions if they
* are compiled without threading support.
*/
ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
compress->workers);
if (ZSTD_isError(ret))
pg_fatal("could not set compression worker count to %d: %s",
compress->workers, ZSTD_getErrorName(ret));
}
if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
{
ret = ZSTD_CCtx_setParameter(streamer->cctx,
ZSTD_c_enableLongDistanceMatching,
compress->long_distance);
if (ZSTD_isError(ret))
{
pg_log_error("could not enable long-distance mode: %s",
ZSTD_getErrorName(ret));
exit(1);
}
}
/* Initialize the ZSTD output buffer. */
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
streamer->zstd_outBuf.pos = 0;
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "ZSTD");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_ZSTD
/*
* Compress the input data to output buffer.
*
* Find out the compression bound based on input data length for each
* invocation to make sure that output buffer has enough capacity to
* accommodate the compressed data. In case if the output buffer
* capacity falls short of compression bound then forward the content
* of output buffer to next streamer and empty the buffer.
*/
static void
astreamer_zstd_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
{
size_t yet_to_flush;
size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
/*
* If the output buffer is not left with enough space, send the
* compressed bytes to the next streamer, and empty the buffer.
*/
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
yet_to_flush =
ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
&inBuf, ZSTD_e_continue);
if (ZSTD_isError(yet_to_flush))
pg_log_error("could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
}
}
/*
* End-of-stream processing.
*/
static void
astreamer_zstd_compressor_finalize(astreamer *streamer)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
size_t yet_to_flush;
do
{
ZSTD_inBuffer in = {NULL, 0, 0};
size_t max_needed = ZSTD_compressBound(0);
/*
* If the output buffer is not left with enough space, send the
* compressed bytes to the next streamer, and empty the buffer.
*/
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
ASTREAMER_UNKNOWN);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
&mystreamer->zstd_outBuf,
&in, ZSTD_e_end);
if (ZSTD_isError(yet_to_flush))
pg_log_error("could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
} while (yet_to_flush > 0);
/* Make sure to pass any remaining bytes to the next streamer. */
if (mystreamer->zstd_outBuf.pos > 0)
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_zstd_compressor_free(astreamer *streamer)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
astreamer_free(streamer->bbs_next);
ZSTD_freeCCtx(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif
/*
* Create a new base backup streamer that performs decompression of zstd
* compressed blocks.
*/
astreamer *
astreamer_zstd_decompressor_new(astreamer *next)
{
#ifdef USE_ZSTD
astreamer_zstd_frame *streamer;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_zstd_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_zstd_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
streamer->dctx = ZSTD_createDCtx();
if (!streamer->dctx)
pg_fatal("could not create zstd decompression context");
/* Initialize the ZSTD output buffer. */
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
streamer->zstd_outBuf.pos = 0;
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "ZSTD");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_ZSTD
/*
* Decompress the input data to output buffer until we run out of input
* data. Each time the output buffer is full, pass on the decompressed data
* to the next streamer.
*/
static void
astreamer_zstd_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
{
size_t ret;
/*
* If output buffer is full then forward the content to next streamer
* and update the output buffer.
*/
if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
ret = ZSTD_decompressStream(mystreamer->dctx,
&mystreamer->zstd_outBuf, &inBuf);
if (ZSTD_isError(ret))
pg_log_error("could not decompress data: %s",
ZSTD_getErrorName(ret));
}
}
/*
* End-of-stream processing.
*/
static void
astreamer_zstd_decompressor_finalize(astreamer *streamer)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
if (mystreamer->zstd_outBuf.pos > 0)
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_zstd_decompressor_free(astreamer *streamer)
{
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
astreamer_free(streamer->bbs_next);
ZSTD_freeDCtx(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif

View File

@@ -2,6 +2,11 @@
fe_utils_sources = files(
'archive.c',
'astreamer_file.c',
'astreamer_gzip.c',
'astreamer_lz4.c',
'astreamer_tar.c',
'astreamer_zstd.c',
'cancel.c',
'conditional.c',
'connect_utils.c',