From 7cf085f077df8dd9b80cf1f5964b5b8c142be496 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 7 Mar 2022 15:08:45 -0500 Subject: [PATCH] Add support for zstd base backup compression. Both client-side compression and server-side compression are now supported for zstd. In addition, a backup compressed by the server using zstd can now be decompressed by the client in order to accommodate the use of -Fp. Jeevan Ladhe, with some edits by me. Discussion: http://postgr.es/m/CA+Tgmobyzfbz=gyze2_LL1ZumZunmaEKbHQxjrFkOR7APZGu-g@mail.gmail.com --- doc/src/sgml/protocol.sgml | 7 +- doc/src/sgml/ref/pg_basebackup.sgml | 37 +- src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 7 +- src/backend/replication/basebackup_zstd.c | 299 ++++++++++++++++ src/bin/pg_basebackup/Makefile | 1 + src/bin/pg_basebackup/bbstreamer.h | 3 + src/bin/pg_basebackup/bbstreamer_zstd.c | 338 ++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 49 ++- src/bin/pg_basebackup/pg_receivewal.c | 4 + src/bin/pg_basebackup/walmethods.h | 1 + src/bin/pg_verifybackup/Makefile | 1 + src/bin/pg_verifybackup/t/008_untar.pl | 9 + src/bin/pg_verifybackup/t/009_extract.pl | 5 + src/bin/pg_verifybackup/t/010_client_untar.pl | 8 + src/include/replication/basebackup_sink.h | 1 + src/tools/msvc/Mkvcbuild.pm | 1 + 17 files changed, 748 insertions(+), 24 deletions(-) create mode 100644 src/backend/replication/basebackup_zstd.c create mode 100644 src/bin/pg_basebackup/bbstreamer_zstd.c diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index c51c4254a70..0695bcd423e 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2724,8 +2724,8 @@ The commands accepted in replication mode are: Instructs the server to compress the backup using the specified - method. Currently, the supported methods are gzip - and lz4. + method. Currently, the supported methods are gzip, + lz4, and zstd. @@ -2737,7 +2737,8 @@ The commands accepted in replication mode are: Specifies the compression level to be used. This should only be used in conjunction with the COMPRESSION option. For gzip the value should be an integer between 1 - and 9, and for lz4 it should be between 1 and 12. + and 9, for lz4 between 1 and 12, and for + zstd it should be between 1 and 22. diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 53aa40dcd19..4a630b59b70 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -417,30 +417,33 @@ PostgreSQL documentation specify -Xfetch. - The compression method can be set to gzip or - lz4, or none for no - compression. A compression level can be optionally specified, by - appending the level number after a colon (:). If no - level is specified, the default compression level will be used. If - only a level is specified without mentioning an algorithm, - gzip compression will be used if the level is - greater than 0, and no compression will be used if the level is 0. + The compression method can be set to gzip, + lz4, zstd, or + none for no compression. A compression level can + optionally be specified, by appending the level number after a colon + (:). If no level is specified, the default + compression level will be used. If only a level is specified without + mentioning an algorithm, gzip compression will be + used if the level is greater than 0, and no compression will be used if + the level is 0. - When the tar format is used with gzip or - lz4, the suffix .gz or - .lz4 will automatically be added to all tar - filenames. When the plain format is used, client-side compression may - not be specified, but it is still possible to request server-side - compression. If this is done, the server will compress the backup for - transmission, and the client will decompress and extract it. + When the tar format is used with gzip, + lz4, or zstd, the suffix + .gz, .lz4, or + .zst, respectively, will be automatically added to + all tar filenames. When the plain format is used, client-side + compression may not be specified, but it is still possible to request + server-side compression. If this is done, the server will compress the + backup for transmission, and the client will decompress and extract it. When this option is used in combination with -Xstream, pg_wal.tar will be compressed using gzip if client-side gzip - compression is selected, but will not be compressed if server-side - compresion or LZ4 compresion is selected. + compression is selected, but will not be compressed if any other + compression algorithm is selected, or if server-side compression + is selected. diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 74043ff331d..2e6de7007fa 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -20,6 +20,7 @@ OBJS = \ basebackup_copy.o \ basebackup_gzip.o \ basebackup_lz4.o \ + basebackup_zstd.o \ basebackup_progress.o \ basebackup_server.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 0bf28b55d7f..2378ce5c5e6 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -64,7 +64,8 @@ typedef enum { BACKUP_COMPRESSION_NONE, BACKUP_COMPRESSION_GZIP, - BACKUP_COMPRESSION_LZ4 + BACKUP_COMPRESSION_LZ4, + BACKUP_COMPRESSION_ZSTD } basebackup_compression_type; typedef struct @@ -906,6 +907,8 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->compression = BACKUP_COMPRESSION_GZIP; else if (strcmp(optval, "lz4") == 0) opt->compression = BACKUP_COMPRESSION_LZ4; + else if (strcmp(optval, "zstd") == 0) + opt->compression = BACKUP_COMPRESSION_ZSTD; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1026,6 +1029,8 @@ SendBaseBackup(BaseBackupCmd *cmd) sink = bbsink_gzip_new(sink, opt.compression_level); else if (opt.compression == BACKUP_COMPRESSION_LZ4) sink = bbsink_lz4_new(sink, opt.compression_level); + else if (opt.compression == BACKUP_COMPRESSION_ZSTD) + sink = bbsink_zstd_new(sink, opt.compression_level); /* Set up progress reporting. */ sink = bbsink_progress_new(sink, opt.progress); diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c new file mode 100644 index 00000000000..e3f9b1d4dc0 --- /dev/null +++ b/src/backend/replication/basebackup_zstd.c @@ -0,0 +1,299 @@ +/*------------------------------------------------------------------------- + * + * basebackup_zstd.c + * Basebackup sink implementing zstd compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_zstd.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef HAVE_LIBZSTD +#include +#endif + +#include "replication/basebackup_sink.h" + +#ifdef HAVE_LIBZSTD + +typedef struct bbsink_zstd +{ + /* Common information for all types of sink. */ + bbsink base; + + /* Compression level */ + int compresslevel; + + ZSTD_CCtx *cctx; + ZSTD_outBuffer zstd_outBuf; +} bbsink_zstd; + +static void bbsink_zstd_begin_backup(bbsink *sink); +static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in); +static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len); +static void bbsink_zstd_end_archive(bbsink *sink); +static void bbsink_zstd_cleanup(bbsink *sink); +static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); + +const bbsink_ops bbsink_zstd_ops = { + .begin_backup = bbsink_zstd_begin_backup, + .begin_archive = bbsink_zstd_begin_archive, + .archive_contents = bbsink_zstd_archive_contents, + .end_archive = bbsink_zstd_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_zstd_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_zstd_end_backup, + .cleanup = bbsink_zstd_cleanup +}; +#endif + +/* + * Create a new basebackup sink that performs zstd compression using the + * designated compression level. + */ +bbsink * +bbsink_zstd_new(bbsink *next, int compresslevel) +{ +#ifndef HAVE_LIBZSTD + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression is not supported by this build"))); + return NULL; /* keep compiler quiet */ +#else + bbsink_zstd *sink; + + Assert(next != NULL); + + if (compresslevel < 0 || compresslevel > 22) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression level %d is out of range", + compresslevel))); + + sink = palloc0(sizeof(bbsink_zstd)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; + sink->base.bbs_next = next; + sink->compresslevel = compresslevel; + + return &sink->base; +#endif +} + +#ifdef HAVE_LIBZSTD + +/* + * Begin backup. + */ +static void +bbsink_zstd_begin_backup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t output_buffer_bound; + + mysink->cctx = ZSTD_createCCtx(); + if (!mysink->cctx) + elog(ERROR, "could not create zstd compression context"); + + ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, + mysink->compresslevel); + + /* + * We need our own buffer, because we're going to pass different data to + * the next sink than what gets passed to us. + */ + mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); + + /* + * Make sure that the next sink's bbs_buffer is big enough to accommodate + * the compressed input buffer. + */ + output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length); + + /* + * The buffer length is expected to be a multiple of BLCKSZ, so round up. + */ + output_buffer_bound = output_buffer_bound + BLCKSZ - + (output_buffer_bound % BLCKSZ); + + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound); +} + +/* + * Prepare to compress the next archive. + */ +static void +bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + char *zstd_archive_name; + + /* + * At the start of each archive we reset the state to start a new + * compression operation. The parameters are sticky and they will stick + * around as we are resetting with option ZSTD_reset_session_only. + */ + ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only); + + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + + /* Add ".zst" to the archive name. */ + zstd_archive_name = psprintf("%s.zst", archive_name); + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, zstd_archive_name); + pfree(zstd_archive_name); +} + +/* + * Compress the input data to the output buffer until we run out of input + * data. Each time the output buffer falls below the compression bound for + * the input buffer, invoke the archive_contents() method for the next sink. + * + * Note that since we're compressing the input, it may very commonly happen + * that we consume all the input data without filling the output buffer. In + * that case, the compressed representation of the current input data won't + * actually be sent to the next bbsink until a later call to this function, + * or perhaps even not until bbsink_zstd_end_archive() is invoked. + */ +static void +bbsink_zstd_archive_contents(bbsink *sink, size_t len) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the out buffer is not left with enough space, send the output + * buffer to the next sink, and reset it. + */ + if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed) + { + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = + mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, + "could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * There might be some data inside zstd's internal buffers; we need to get that + * flushed out, also end the zstd frame and then get that forwarded to the + * successor sink as archive content. + * + * Then we can end processing for this archive. + */ +static void +bbsink_zstd_end_archive(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t max_needed = ZSTD_compressBound(0); + + /* + * If the out buffer is not left with enough space, send the output + * buffer to the next sink, and reset it. + */ + if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed) + { + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = + mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, + &mysink->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, "could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next sink. */ + if (mysink->zstd_outBuf.pos > 0) + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); + + /* Pass on the information that this archive has ended. */ + bbsink_forward_end_archive(sink); +} + +/* + * Free the resources and context. + */ +static void +bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } + + bbsink_forward_end_backup(sink, endptr, endtli); +} + +/* + * Manifest contents are not compressed, but we do need to copy them into + * the successor sink's buffer, because we have our own. + */ +static void +bbsink_zstd_manifest_contents(bbsink *sink, size_t len) +{ + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); + bbsink_manifest_contents(sink->bbs_next, len); +} + +/* + * In case the backup fails, make sure we free any compression context that + * got allocated, so that we don't leak memory. + */ +static void +bbsink_zstd_cleanup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context if not already released. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } +} + +#endif diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 1d0db4f9d02..0035ebcef54 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -44,6 +44,7 @@ BBOBJS = \ bbstreamer_gzip.o \ bbstreamer_inject.o \ bbstreamer_lz4.o \ + bbstreamer_zstd.o \ bbstreamer_tar.o all: pg_basebackup pg_receivewal pg_recvlogical diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h index c2de77bacc0..02d4c05df6e 100644 --- a/src/bin/pg_basebackup/bbstreamer.h +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -209,6 +209,9 @@ extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel); extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); +extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, + int compresslevel); +extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c new file mode 100644 index 00000000000..cc68367dd56 --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -0,0 +1,338 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer_zstd.c + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer_zstd.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef HAVE_LIBZSTD +#include +#endif + +#include "bbstreamer.h" +#include "common/logging.h" + +#ifdef HAVE_LIBZSTD + +typedef struct bbstreamer_zstd_frame +{ + bbstreamer base; + + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; + ZSTD_outBuffer zstd_outBuf; +} bbstreamer_zstd_frame; + +static void bbstreamer_zstd_compressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer); +static void bbstreamer_zstd_compressor_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_zstd_compressor_ops = { + .content = bbstreamer_zstd_compressor_content, + .finalize = bbstreamer_zstd_compressor_finalize, + .free = bbstreamer_zstd_compressor_free +}; + +static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer); +static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { + .content = bbstreamer_zstd_decompressor_content, + .finalize = bbstreamer_zstd_decompressor_finalize, + .free = bbstreamer_zstd_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs zstd compression of tar + * blocks. + */ +bbstreamer * +bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel) +{ +#ifdef HAVE_LIBZSTD + bbstreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(bbstreamer_zstd_frame)); + + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_zstd_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->cctx = ZSTD_createCCtx(); + if (!streamer->cctx) + pg_log_error("could not create zstd compression context"); + + /* Initialize stream compression preferences */ + ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compresslevel); + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_log_error("this build does not support zstd compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZSTD +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +bbstreamer_zstd_compressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + bbstreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = + ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * End-of-stream processing. + */ +static void +bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t max_needed = ZSTD_compressBound(0); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + BBSTREAMER_UNKNOWN); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, + &mystreamer->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next streamer. */ + if (mystreamer->zstd_outBuf.pos > 0) + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + BBSTREAMER_UNKNOWN); + + bbstreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +bbstreamer_zstd_compressor_free(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + bbstreamer_free(streamer->bbs_next); + ZSTD_freeCCtx(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of zstd + * compressed blocks. + */ +bbstreamer * +bbstreamer_zstd_decompressor_new(bbstreamer *next) +{ +#ifdef HAVE_LIBZSTD + bbstreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(bbstreamer_zstd_frame)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_zstd_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->dctx = ZSTD_createDCtx(); + if (!streamer->dctx) + { + pg_log_error("could not create zstd decompression context"); + exit(1); + } + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_log_error("this build does not support compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZSTD +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +bbstreamer_zstd_decompressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t ret; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) + { + bbstreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + ret = ZSTD_decompressStream(mystreamer->dctx, + &mystreamer->zstd_outBuf, &inBuf); + + if (ZSTD_isError(ret)) + pg_log_error("could not decompress data: %s", ZSTD_getErrorName(ret)); + } +} + +/* + * End-of-stream processing. + */ +static void +bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (mystreamer->zstd_outBuf.pos > 0) + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + BBSTREAMER_UNKNOWN); + + bbstreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +bbstreamer_zstd_decompressor_free(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + bbstreamer_free(streamer->bbs_next); + ZSTD_freeDCtx(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index c1ed7aeeee1..9f3ecc60fbe 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -405,8 +405,9 @@ usage(void) printf(_(" -X, --wal-method=none|fetch|stream\n" " include required WAL files with specified method\n")); printf(_(" -z, --gzip compress tar output\n")); - printf(_(" -Z, --compress={[{client,server}-]gzip,lz4,none}[:LEVEL] or [LEVEL]\n" + printf(_(" -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL]\n" " compress tar output with given compression method or level\n")); + printf(_(" -Z, --compress=none do not compress tar output\n")); printf(_("\nGeneral options:\n")); printf(_(" -c, --checkpoint=fast|spread\n" " set fast or spread checkpointing\n")); @@ -1067,6 +1068,21 @@ parse_compress_options(char *src, WalCompressionMethod *methodres, *methodres = COMPRESSION_LZ4; *locationres = COMPRESS_LOCATION_SERVER; } + else if (pg_strcasecmp(firstpart, "zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_UNSPECIFIED; + } + else if (pg_strcasecmp(firstpart, "client-zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_CLIENT; + } + else if (pg_strcasecmp(firstpart, "server-zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_SERVER; + } else if (pg_strcasecmp(firstpart, "none") == 0) { *methodres = COMPRESSION_NONE; @@ -1191,7 +1207,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, bool inject_manifest; bool is_tar, is_tar_gz, - is_tar_lz4; + is_tar_lz4, + is_tar_zstd; bool must_parse_archive; int archive_name_len = strlen(archive_name); @@ -1214,6 +1231,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation, is_tar_lz4 = (archive_name_len > 8 && strcmp(archive_name + archive_name_len - 4, ".lz4") == 0); + /* Is this a ZSTD archive? */ + is_tar_zstd = (archive_name_len > 8 && + strcmp(archive_name + archive_name_len - 4, ".zst") == 0); + /* * We have to parse the archive if (1) we're suppose to extract it, or if * (2) we need to inject backup_manifest or recovery configuration into it. @@ -1223,7 +1244,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, (spclocation == NULL && writerecoveryconf)); /* At present, we only know how to parse tar archives. */ - if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4) + if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4 + && !is_tar_zstd) { pg_log_error("unable to parse archive: %s", archive_name); pg_log_info("only tar archives can be parsed"); @@ -1295,6 +1317,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation, streamer = bbstreamer_lz4_compressor_new(streamer, compresslevel); } + else if (compressmethod == COMPRESSION_ZSTD) + { + strlcat(archive_filename, ".zst", sizeof(archive_filename)); + streamer = bbstreamer_plain_writer_new(archive_filename, + archive_file); + streamer = bbstreamer_zstd_compressor_new(streamer, + compresslevel); + } else { Assert(false); /* not reachable */ @@ -1353,6 +1383,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, streamer = bbstreamer_gzip_decompressor_new(streamer); else if (compressmethod == COMPRESSION_LZ4) streamer = bbstreamer_lz4_decompressor_new(streamer); + else if (compressmethod == COMPRESSION_ZSTD) + streamer = bbstreamer_zstd_decompressor_new(streamer); } /* Return the results. */ @@ -2020,6 +2052,9 @@ BaseBackup(void) case COMPRESSION_LZ4: compressmethodstr = "lz4"; break; + case COMPRESSION_ZSTD: + compressmethodstr = "zstd"; + break; default: Assert(false); break; @@ -2869,6 +2904,14 @@ main(int argc, char **argv) exit(1); } break; + case COMPRESSION_ZSTD: + if (compresslevel > 22) + { + pg_log_error("compression level %d of method %s higher than maximum of 22", + compresslevel, "zstd"); + exit(1); + } + break; } /* diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index ce661a9ce45..8a4c2b89646 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -904,6 +904,10 @@ main(int argc, char **argv) exit(1); #endif break; + case COMPRESSION_ZSTD: + pg_log_error("compression with %s is not yet supported", "ZSTD"); + exit(1); + } diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 2dfb353baad..ec54019cfc3 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -24,6 +24,7 @@ typedef enum { COMPRESSION_GZIP, COMPRESSION_LZ4, + COMPRESSION_ZSTD, COMPRESSION_NONE } WalCompressionMethod; diff --git a/src/bin/pg_verifybackup/Makefile b/src/bin/pg_verifybackup/Makefile index 851233a6e0e..596df15118b 100644 --- a/src/bin/pg_verifybackup/Makefile +++ b/src/bin/pg_verifybackup/Makefile @@ -10,6 +10,7 @@ export TAR # name. export GZIP_PROGRAM=$(GZIP) export LZ4=$(LZ4) +export ZSTD=$(ZSTD) subdir = src/bin/pg_verifybackup top_builddir = ../../.. diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl index 383203d0b86..efbc910dfbb 100644 --- a/src/bin/pg_verifybackup/t/008_untar.pl +++ b/src/bin/pg_verifybackup/t/008_untar.pl @@ -42,6 +42,14 @@ my @test_configuration = ( 'decompress_program' => $ENV{'LZ4'}, 'decompress_flags' => [ '-d', '-m'], 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'server-zstd'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); @@ -107,6 +115,7 @@ for my $tc (@test_configuration) # Cleanup. unlink($backup_path . '/backup_manifest'); unlink($backup_path . '/base.tar'); + unlink($backup_path . '/' . $tc->{'backup_archive'}); rmtree($extract_path); } } diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl index c51cdf79f8c..d30ba01742a 100644 --- a/src/bin/pg_verifybackup/t/009_extract.pl +++ b/src/bin/pg_verifybackup/t/009_extract.pl @@ -31,6 +31,11 @@ my @test_configuration = ( 'compression_method' => 'lz4', 'backup_flags' => ['--compress', 'server-lz4:5'], 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'server-zstd:5'], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index 36165293908..c2a6161be68 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -42,6 +42,14 @@ my @test_configuration = ( 'decompress_flags' => [ '-d' ], 'output_file' => 'base.tar', 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'client-zstd:5'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index a3f8d372582..a7f16758a42 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -285,6 +285,7 @@ extern void bbsink_forward_cleanup(bbsink *sink); extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel); +extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 105f5c72a2d..441d6ae6bfc 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -380,6 +380,7 @@ sub mkvcbuild $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c'); + $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_zstd.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c'); $pgbasebackup->AddLibrary('ws2_32.lib');