1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-25 21:42:33 +03:00

Add support for zstd base backup compression.

Both client-side compression and server-side compression are now
supported for zstd. In addition, a backup compressed by the server
using zstd can now be decompressed by the client in order to
accommodate the use of -Fp.

Jeevan Ladhe, with some edits by me.

Discussion: http://postgr.es/m/CA+Tgmobyzfbz=gyze2_LL1ZumZunmaEKbHQxjrFkOR7APZGu-g@mail.gmail.com
This commit is contained in:
Robert Haas 2022-03-07 15:08:45 -05:00
parent c28839c832
commit 7cf085f077
17 changed files with 748 additions and 24 deletions

View File

@ -2724,8 +2724,8 @@ The commands accepted in replication mode are:
<listitem> <listitem>
<para> <para>
Instructs the server to compress the backup using the specified Instructs the server to compress the backup using the specified
method. Currently, the supported methods are <literal>gzip</literal> method. Currently, the supported methods are <literal>gzip</literal>,
and <literal>lz4</literal>. <literal>lz4</literal>, and <literal>zstd</literal>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
@ -2737,7 +2737,8 @@ The commands accepted in replication mode are:
Specifies the compression level to be used. This should only be Specifies the compression level to be used. This should only be
used in conjunction with the <literal>COMPRESSION</literal> option. used in conjunction with the <literal>COMPRESSION</literal> option.
For <literal>gzip</literal> the value should be an integer between 1 For <literal>gzip</literal> the value should be an integer between 1
and 9, and for <literal>lz4</literal> it should be between 1 and 12. and 9, for <literal>lz4</literal> between 1 and 12, and for
<literal>zstd</literal> it should be between 1 and 22.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>

View File

@ -417,30 +417,33 @@ PostgreSQL documentation
specify <literal>-Xfetch</literal>. specify <literal>-Xfetch</literal>.
</para> </para>
<para> <para>
The compression method can be set to <literal>gzip</literal> or The compression method can be set to <literal>gzip</literal>,
<literal>lz4</literal>, or <literal>none</literal> for no <literal>lz4</literal>, <literal>zstd</literal>, or
compression. A compression level can be optionally specified, by <literal>none</literal> for no compression. A compression level can
appending the level number after a colon (<literal>:</literal>). If no optionally be specified, by appending the level number after a colon
level is specified, the default compression level will be used. If (<literal>:</literal>). If no level is specified, the default
only a level is specified without mentioning an algorithm, compression level will be used. If only a level is specified without
<literal>gzip</literal> compression will be used if the level is mentioning an algorithm, <literal>gzip</literal> compression will be
greater than 0, and no compression will be used if the level is 0. used if the level is greater than 0, and no compression will be used if
the level is 0.
</para> </para>
<para> <para>
When the tar format is used with <literal>gzip</literal> or When the tar format is used with <literal>gzip</literal>,
<literal>lz4</literal>, the suffix <filename>.gz</filename> or <literal>lz4</literal>, or <literal>zstd</literal>, the suffix
<filename>.lz4</filename> will automatically be added to all tar <filename>.gz</filename>, <filename>.lz4</filename>, or
filenames. When the plain format is used, client-side compression may <filename>.zst</filename>, respectively, will be automatically added to
not be specified, but it is still possible to request server-side all tar filenames. When the plain format is used, client-side
compression. If this is done, the server will compress the backup for compression may not be specified, but it is still possible to request
transmission, and the client will decompress and extract it. server-side compression. If this is done, the server will compress the
backup for transmission, and the client will decompress and extract it.
</para> </para>
<para> <para>
When this option is used in combination with When this option is used in combination with
<literal>-Xstream</literal>, <literal>pg_wal.tar</literal> will <literal>-Xstream</literal>, <literal>pg_wal.tar</literal> will
be compressed using <literal>gzip</literal> if client-side gzip be compressed using <literal>gzip</literal> if client-side gzip
compression is selected, but will not be compressed if server-side compression is selected, but will not be compressed if any other
compresion or LZ4 compresion is selected. compression algorithm is selected, or if server-side compression
is selected.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>

View File

@ -20,6 +20,7 @@ OBJS = \
basebackup_copy.o \ basebackup_copy.o \
basebackup_gzip.o \ basebackup_gzip.o \
basebackup_lz4.o \ basebackup_lz4.o \
basebackup_zstd.o \
basebackup_progress.o \ basebackup_progress.o \
basebackup_server.o \ basebackup_server.o \
basebackup_sink.o \ basebackup_sink.o \

View File

@ -64,7 +64,8 @@ typedef enum
{ {
BACKUP_COMPRESSION_NONE, BACKUP_COMPRESSION_NONE,
BACKUP_COMPRESSION_GZIP, BACKUP_COMPRESSION_GZIP,
BACKUP_COMPRESSION_LZ4 BACKUP_COMPRESSION_LZ4,
BACKUP_COMPRESSION_ZSTD
} basebackup_compression_type; } basebackup_compression_type;
typedef struct typedef struct
@ -906,6 +907,8 @@ parse_basebackup_options(List *options, basebackup_options *opt)
opt->compression = BACKUP_COMPRESSION_GZIP; opt->compression = BACKUP_COMPRESSION_GZIP;
else if (strcmp(optval, "lz4") == 0) else if (strcmp(optval, "lz4") == 0)
opt->compression = BACKUP_COMPRESSION_LZ4; opt->compression = BACKUP_COMPRESSION_LZ4;
else if (strcmp(optval, "zstd") == 0)
opt->compression = BACKUP_COMPRESSION_ZSTD;
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
@ -1026,6 +1029,8 @@ SendBaseBackup(BaseBackupCmd *cmd)
sink = bbsink_gzip_new(sink, opt.compression_level); sink = bbsink_gzip_new(sink, opt.compression_level);
else if (opt.compression == BACKUP_COMPRESSION_LZ4) else if (opt.compression == BACKUP_COMPRESSION_LZ4)
sink = bbsink_lz4_new(sink, opt.compression_level); sink = bbsink_lz4_new(sink, opt.compression_level);
else if (opt.compression == BACKUP_COMPRESSION_ZSTD)
sink = bbsink_zstd_new(sink, opt.compression_level);
/* Set up progress reporting. */ /* Set up progress reporting. */
sink = bbsink_progress_new(sink, opt.progress); sink = bbsink_progress_new(sink, opt.progress);

View File

@ -0,0 +1,299 @@
/*-------------------------------------------------------------------------
*
* basebackup_zstd.c
* Basebackup sink implementing zstd compression.
*
* Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/basebackup_zstd.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#ifdef HAVE_LIBZSTD
#include <zstd.h>
#endif
#include "replication/basebackup_sink.h"
#ifdef HAVE_LIBZSTD
typedef struct bbsink_zstd
{
/* Common information for all types of sink. */
bbsink base;
/* Compression level */
int compresslevel;
ZSTD_CCtx *cctx;
ZSTD_outBuffer zstd_outBuf;
} bbsink_zstd;
static void bbsink_zstd_begin_backup(bbsink *sink);
static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
static void bbsink_zstd_end_archive(bbsink *sink);
static void bbsink_zstd_cleanup(bbsink *sink);
static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
TimeLineID endtli);
const bbsink_ops bbsink_zstd_ops = {
.begin_backup = bbsink_zstd_begin_backup,
.begin_archive = bbsink_zstd_begin_archive,
.archive_contents = bbsink_zstd_archive_contents,
.end_archive = bbsink_zstd_end_archive,
.begin_manifest = bbsink_forward_begin_manifest,
.manifest_contents = bbsink_zstd_manifest_contents,
.end_manifest = bbsink_forward_end_manifest,
.end_backup = bbsink_zstd_end_backup,
.cleanup = bbsink_zstd_cleanup
};
#endif
/*
* Create a new basebackup sink that performs zstd compression using the
* designated compression level.
*/
bbsink *
bbsink_zstd_new(bbsink *next, int compresslevel)
{
#ifndef HAVE_LIBZSTD
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("zstd compression is not supported by this build")));
return NULL; /* keep compiler quiet */
#else
bbsink_zstd *sink;
Assert(next != NULL);
if (compresslevel < 0 || compresslevel > 22)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("zstd compression level %d is out of range",
compresslevel)));
sink = palloc0(sizeof(bbsink_zstd));
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
sink->base.bbs_next = next;
sink->compresslevel = compresslevel;
return &sink->base;
#endif
}
#ifdef HAVE_LIBZSTD
/*
* Begin backup.
*/
static void
bbsink_zstd_begin_backup(bbsink *sink)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
size_t output_buffer_bound;
mysink->cctx = ZSTD_createCCtx();
if (!mysink->cctx)
elog(ERROR, "could not create zstd compression context");
ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
mysink->compresslevel);
/*
* We need our own buffer, because we're going to pass different data to
* the next sink than what gets passed to us.
*/
mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
/*
* Make sure that the next sink's bbs_buffer is big enough to accommodate
* the compressed input buffer.
*/
output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
/*
* The buffer length is expected to be a multiple of BLCKSZ, so round up.
*/
output_buffer_bound = output_buffer_bound + BLCKSZ -
(output_buffer_bound % BLCKSZ);
bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
}
/*
* Prepare to compress the next archive.
*/
static void
bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
char *zstd_archive_name;
/*
* At the start of each archive we reset the state to start a new
* compression operation. The parameters are sticky and they will stick
* around as we are resetting with option ZSTD_reset_session_only.
*/
ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
mysink->zstd_outBuf.pos = 0;
/* Add ".zst" to the archive name. */
zstd_archive_name = psprintf("%s.zst", archive_name);
Assert(sink->bbs_next != NULL);
bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
pfree(zstd_archive_name);
}
/*
* Compress the input data to the output buffer until we run out of input
* data. Each time the output buffer falls below the compression bound for
* the input buffer, invoke the archive_contents() method for the next sink.
*
* Note that since we're compressing the input, it may very commonly happen
* that we consume all the input data without filling the output buffer. In
* that case, the compressed representation of the current input data won't
* actually be sent to the next bbsink until a later call to this function,
* or perhaps even not until bbsink_zstd_end_archive() is invoked.
*/
static void
bbsink_zstd_archive_contents(bbsink *sink, size_t len)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
while (inBuf.pos < inBuf.size)
{
size_t yet_to_flush;
size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
/*
* If the out buffer is not left with enough space, send the output
* buffer to the next sink, and reset it.
*/
if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
{
bbsink_archive_contents(mysink->base.bbs_next,
mysink->zstd_outBuf.pos);
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
mysink->zstd_outBuf.size =
mysink->base.bbs_next->bbs_buffer_length;
mysink->zstd_outBuf.pos = 0;
}
yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
&inBuf, ZSTD_e_continue);
if (ZSTD_isError(yet_to_flush))
elog(ERROR,
"could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
}
}
/*
* There might be some data inside zstd's internal buffers; we need to get that
* flushed out, also end the zstd frame and then get that forwarded to the
* successor sink as archive content.
*
* Then we can end processing for this archive.
*/
static void
bbsink_zstd_end_archive(bbsink *sink)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
size_t yet_to_flush;
do
{
ZSTD_inBuffer in = {NULL, 0, 0};
size_t max_needed = ZSTD_compressBound(0);
/*
* If the out buffer is not left with enough space, send the output
* buffer to the next sink, and reset it.
*/
if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
{
bbsink_archive_contents(mysink->base.bbs_next,
mysink->zstd_outBuf.pos);
mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
mysink->zstd_outBuf.size =
mysink->base.bbs_next->bbs_buffer_length;
mysink->zstd_outBuf.pos = 0;
}
yet_to_flush = ZSTD_compressStream2(mysink->cctx,
&mysink->zstd_outBuf,
&in, ZSTD_e_end);
if (ZSTD_isError(yet_to_flush))
elog(ERROR, "could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
} while (yet_to_flush > 0);
/* Make sure to pass any remaining bytes to the next sink. */
if (mysink->zstd_outBuf.pos > 0)
bbsink_archive_contents(mysink->base.bbs_next,
mysink->zstd_outBuf.pos);
/* Pass on the information that this archive has ended. */
bbsink_forward_end_archive(sink);
}
/*
* Free the resources and context.
*/
static void
bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
TimeLineID endtli)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
/* Release the context. */
if (mysink->cctx)
{
ZSTD_freeCCtx(mysink->cctx);
mysink->cctx = NULL;
}
bbsink_forward_end_backup(sink, endptr, endtli);
}
/*
* Manifest contents are not compressed, but we do need to copy them into
* the successor sink's buffer, because we have our own.
*/
static void
bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
{
memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
bbsink_manifest_contents(sink->bbs_next, len);
}
/*
* In case the backup fails, make sure we free any compression context that
* got allocated, so that we don't leak memory.
*/
static void
bbsink_zstd_cleanup(bbsink *sink)
{
bbsink_zstd *mysink = (bbsink_zstd *) sink;
/* Release the context if not already released. */
if (mysink->cctx)
{
ZSTD_freeCCtx(mysink->cctx);
mysink->cctx = NULL;
}
}
#endif

View File

@ -44,6 +44,7 @@ BBOBJS = \
bbstreamer_gzip.o \ bbstreamer_gzip.o \
bbstreamer_inject.o \ bbstreamer_inject.o \
bbstreamer_lz4.o \ bbstreamer_lz4.o \
bbstreamer_zstd.o \
bbstreamer_tar.o bbstreamer_tar.o
all: pg_basebackup pg_receivewal pg_recvlogical all: pg_basebackup pg_receivewal pg_recvlogical

View File

@ -209,6 +209,9 @@ extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
int compresslevel); int compresslevel);
extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
int compresslevel);
extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);

View File

@ -0,0 +1,338 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_zstd.c
*
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_zstd.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#ifdef HAVE_LIBZSTD
#include <zstd.h>
#endif
#include "bbstreamer.h"
#include "common/logging.h"
#ifdef HAVE_LIBZSTD
typedef struct bbstreamer_zstd_frame
{
bbstreamer base;
ZSTD_CCtx *cctx;
ZSTD_DCtx *dctx;
ZSTD_outBuffer zstd_outBuf;
} bbstreamer_zstd_frame;
static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
.content = bbstreamer_zstd_compressor_content,
.finalize = bbstreamer_zstd_compressor_finalize,
.free = bbstreamer_zstd_compressor_free
};
static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
.content = bbstreamer_zstd_decompressor_content,
.finalize = bbstreamer_zstd_decompressor_finalize,
.free = bbstreamer_zstd_decompressor_free
};
#endif
/*
* Create a new base backup streamer that performs zstd compression of tar
* blocks.
*/
bbstreamer *
bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel)
{
#ifdef HAVE_LIBZSTD
bbstreamer_zstd_frame *streamer;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_zstd_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_zstd_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
streamer->cctx = ZSTD_createCCtx();
if (!streamer->cctx)
pg_log_error("could not create zstd compression context");
/* Initialize stream compression preferences */
ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
compresslevel);
/* Initialize the ZSTD output buffer. */
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
streamer->zstd_outBuf.pos = 0;
return &streamer->base;
#else
pg_log_error("this build does not support zstd compression");
exit(1);
#endif
}
#ifdef HAVE_LIBZSTD
/*
* Compress the input data to output buffer.
*
* Find out the compression bound based on input data length for each
* invocation to make sure that output buffer has enough capacity to
* accommodate the compressed data. In case if the output buffer
* capacity falls short of compression bound then forward the content
* of output buffer to next streamer and empty the buffer.
*/
static void
bbstreamer_zstd_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
{
size_t yet_to_flush;
size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
/*
* If the output buffer is not left with enough space, send the
* compressed bytes to the next streamer, and empty the buffer.
*/
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
yet_to_flush =
ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
&inBuf, ZSTD_e_continue);
if (ZSTD_isError(yet_to_flush))
pg_log_error("could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
}
}
/*
* End-of-stream processing.
*/
static void
bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
size_t yet_to_flush;
do
{
ZSTD_inBuffer in = {NULL, 0, 0};
size_t max_needed = ZSTD_compressBound(0);
/*
* If the output buffer is not left with enough space, send the
* compressed bytes to the next streamer, and empty the buffer.
*/
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
BBSTREAMER_UNKNOWN);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
&mystreamer->zstd_outBuf,
&in, ZSTD_e_end);
if (ZSTD_isError(yet_to_flush))
pg_log_error("could not compress data: %s",
ZSTD_getErrorName(yet_to_flush));
} while (yet_to_flush > 0);
/* Make sure to pass any remaining bytes to the next streamer. */
if (mystreamer->zstd_outBuf.pos > 0)
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
BBSTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_zstd_compressor_free(bbstreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
ZSTD_freeCCtx(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif
/*
* Create a new base backup streamer that performs decompression of zstd
* compressed blocks.
*/
bbstreamer *
bbstreamer_zstd_decompressor_new(bbstreamer *next)
{
#ifdef HAVE_LIBZSTD
bbstreamer_zstd_frame *streamer;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_zstd_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_zstd_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
streamer->dctx = ZSTD_createDCtx();
if (!streamer->dctx)
{
pg_log_error("could not create zstd decompression context");
exit(1);
}
/* Initialize the ZSTD output buffer. */
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
streamer->zstd_outBuf.pos = 0;
return &streamer->base;
#else
pg_log_error("this build does not support compression");
exit(1);
#endif
}
#ifdef HAVE_LIBZSTD
/*
* Decompress the input data to output buffer until we run out of input
* data. Each time the output buffer is full, pass on the decompressed data
* to the next streamer.
*/
static void
bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
{
size_t ret;
/*
* If output buffer is full then forward the content to next streamer
* and update the output buffer.
*/
if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
mystreamer->zstd_outBuf.pos = 0;
}
ret = ZSTD_decompressStream(mystreamer->dctx,
&mystreamer->zstd_outBuf, &inBuf);
if (ZSTD_isError(ret))
pg_log_error("could not decompress data: %s", ZSTD_getErrorName(ret));
}
}
/*
* End-of-stream processing.
*/
static void
bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
if (mystreamer->zstd_outBuf.pos > 0)
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
BBSTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
ZSTD_freeDCtx(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif

View File

@ -405,8 +405,9 @@ usage(void)
printf(_(" -X, --wal-method=none|fetch|stream\n" printf(_(" -X, --wal-method=none|fetch|stream\n"
" include required WAL files with specified method\n")); " include required WAL files with specified method\n"));
printf(_(" -z, --gzip compress tar output\n")); printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress={[{client,server}-]gzip,lz4,none}[:LEVEL] or [LEVEL]\n" printf(_(" -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL]\n"
" compress tar output with given compression method or level\n")); " compress tar output with given compression method or level\n"));
printf(_(" -Z, --compress=none do not compress tar output\n"));
printf(_("\nGeneral options:\n")); printf(_("\nGeneral options:\n"));
printf(_(" -c, --checkpoint=fast|spread\n" printf(_(" -c, --checkpoint=fast|spread\n"
" set fast or spread checkpointing\n")); " set fast or spread checkpointing\n"));
@ -1067,6 +1068,21 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
*methodres = COMPRESSION_LZ4; *methodres = COMPRESSION_LZ4;
*locationres = COMPRESS_LOCATION_SERVER; *locationres = COMPRESS_LOCATION_SERVER;
} }
else if (pg_strcasecmp(firstpart, "zstd") == 0)
{
*methodres = COMPRESSION_ZSTD;
*locationres = COMPRESS_LOCATION_UNSPECIFIED;
}
else if (pg_strcasecmp(firstpart, "client-zstd") == 0)
{
*methodres = COMPRESSION_ZSTD;
*locationres = COMPRESS_LOCATION_CLIENT;
}
else if (pg_strcasecmp(firstpart, "server-zstd") == 0)
{
*methodres = COMPRESSION_ZSTD;
*locationres = COMPRESS_LOCATION_SERVER;
}
else if (pg_strcasecmp(firstpart, "none") == 0) else if (pg_strcasecmp(firstpart, "none") == 0)
{ {
*methodres = COMPRESSION_NONE; *methodres = COMPRESSION_NONE;
@ -1191,7 +1207,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
bool inject_manifest; bool inject_manifest;
bool is_tar, bool is_tar,
is_tar_gz, is_tar_gz,
is_tar_lz4; is_tar_lz4,
is_tar_zstd;
bool must_parse_archive; bool must_parse_archive;
int archive_name_len = strlen(archive_name); int archive_name_len = strlen(archive_name);
@ -1214,6 +1231,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
is_tar_lz4 = (archive_name_len > 8 && is_tar_lz4 = (archive_name_len > 8 &&
strcmp(archive_name + archive_name_len - 4, ".lz4") == 0); strcmp(archive_name + archive_name_len - 4, ".lz4") == 0);
/* Is this a ZSTD archive? */
is_tar_zstd = (archive_name_len > 8 &&
strcmp(archive_name + archive_name_len - 4, ".zst") == 0);
/* /*
* We have to parse the archive if (1) we're suppose to extract it, or if * We have to parse the archive if (1) we're suppose to extract it, or if
* (2) we need to inject backup_manifest or recovery configuration into it. * (2) we need to inject backup_manifest or recovery configuration into it.
@ -1223,7 +1244,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
(spclocation == NULL && writerecoveryconf)); (spclocation == NULL && writerecoveryconf));
/* At present, we only know how to parse tar archives. */ /* At present, we only know how to parse tar archives. */
if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4) if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4
&& !is_tar_zstd)
{ {
pg_log_error("unable to parse archive: %s", archive_name); pg_log_error("unable to parse archive: %s", archive_name);
pg_log_info("only tar archives can be parsed"); pg_log_info("only tar archives can be parsed");
@ -1295,6 +1317,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
streamer = bbstreamer_lz4_compressor_new(streamer, streamer = bbstreamer_lz4_compressor_new(streamer,
compresslevel); compresslevel);
} }
else if (compressmethod == COMPRESSION_ZSTD)
{
strlcat(archive_filename, ".zst", sizeof(archive_filename));
streamer = bbstreamer_plain_writer_new(archive_filename,
archive_file);
streamer = bbstreamer_zstd_compressor_new(streamer,
compresslevel);
}
else else
{ {
Assert(false); /* not reachable */ Assert(false); /* not reachable */
@ -1353,6 +1383,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
streamer = bbstreamer_gzip_decompressor_new(streamer); streamer = bbstreamer_gzip_decompressor_new(streamer);
else if (compressmethod == COMPRESSION_LZ4) else if (compressmethod == COMPRESSION_LZ4)
streamer = bbstreamer_lz4_decompressor_new(streamer); streamer = bbstreamer_lz4_decompressor_new(streamer);
else if (compressmethod == COMPRESSION_ZSTD)
streamer = bbstreamer_zstd_decompressor_new(streamer);
} }
/* Return the results. */ /* Return the results. */
@ -2020,6 +2052,9 @@ BaseBackup(void)
case COMPRESSION_LZ4: case COMPRESSION_LZ4:
compressmethodstr = "lz4"; compressmethodstr = "lz4";
break; break;
case COMPRESSION_ZSTD:
compressmethodstr = "zstd";
break;
default: default:
Assert(false); Assert(false);
break; break;
@ -2869,6 +2904,14 @@ main(int argc, char **argv)
exit(1); exit(1);
} }
break; break;
case COMPRESSION_ZSTD:
if (compresslevel > 22)
{
pg_log_error("compression level %d of method %s higher than maximum of 22",
compresslevel, "zstd");
exit(1);
}
break;
} }
/* /*

View File

@ -904,6 +904,10 @@ main(int argc, char **argv)
exit(1); exit(1);
#endif #endif
break; break;
case COMPRESSION_ZSTD:
pg_log_error("compression with %s is not yet supported", "ZSTD");
exit(1);
} }

View File

@ -24,6 +24,7 @@ typedef enum
{ {
COMPRESSION_GZIP, COMPRESSION_GZIP,
COMPRESSION_LZ4, COMPRESSION_LZ4,
COMPRESSION_ZSTD,
COMPRESSION_NONE COMPRESSION_NONE
} WalCompressionMethod; } WalCompressionMethod;

View File

@ -10,6 +10,7 @@ export TAR
# name. # name.
export GZIP_PROGRAM=$(GZIP) export GZIP_PROGRAM=$(GZIP)
export LZ4=$(LZ4) export LZ4=$(LZ4)
export ZSTD=$(ZSTD)
subdir = src/bin/pg_verifybackup subdir = src/bin/pg_verifybackup
top_builddir = ../../.. top_builddir = ../../..

View File

@ -42,6 +42,14 @@ my @test_configuration = (
'decompress_program' => $ENV{'LZ4'}, 'decompress_program' => $ENV{'LZ4'},
'decompress_flags' => [ '-d', '-m'], 'decompress_flags' => [ '-d', '-m'],
'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
},
{
'compression_method' => 'zstd',
'backup_flags' => ['--compress', 'server-zstd'],
'backup_archive' => 'base.tar.zst',
'decompress_program' => $ENV{'ZSTD'},
'decompress_flags' => [ '-d' ],
'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
} }
); );
@ -107,6 +115,7 @@ for my $tc (@test_configuration)
# Cleanup. # Cleanup.
unlink($backup_path . '/backup_manifest'); unlink($backup_path . '/backup_manifest');
unlink($backup_path . '/base.tar'); unlink($backup_path . '/base.tar');
unlink($backup_path . '/' . $tc->{'backup_archive'});
rmtree($extract_path); rmtree($extract_path);
} }
} }

View File

@ -31,6 +31,11 @@ my @test_configuration = (
'compression_method' => 'lz4', 'compression_method' => 'lz4',
'backup_flags' => ['--compress', 'server-lz4:5'], 'backup_flags' => ['--compress', 'server-lz4:5'],
'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
},
{
'compression_method' => 'zstd',
'backup_flags' => ['--compress', 'server-zstd:5'],
'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
} }
); );

View File

@ -42,6 +42,14 @@ my @test_configuration = (
'decompress_flags' => [ '-d' ], 'decompress_flags' => [ '-d' ],
'output_file' => 'base.tar', 'output_file' => 'base.tar',
'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
},
{
'compression_method' => 'zstd',
'backup_flags' => ['--compress', 'client-zstd:5'],
'backup_archive' => 'base.tar.zst',
'decompress_program' => $ENV{'ZSTD'},
'decompress_flags' => [ '-d' ],
'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
} }
); );

View File

@ -285,6 +285,7 @@ extern void bbsink_forward_cleanup(bbsink *sink);
extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_copystream_new(bool send_to_client);
extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel);
extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel); extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel);
extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel);
extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_server_new(bbsink *next, char *pathname);
extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);

View File

@ -380,6 +380,7 @@ sub mkvcbuild
$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c');
$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c');
$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c');
$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_zstd.c');
$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c');
$pgbasebackup->AddLibrary('ws2_32.lib'); $pgbasebackup->AddLibrary('ws2_32.lib');