1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-09 02:08:45 +03:00
Files
postgres/src/fe_utils/astreamer_lz4.c
Michael Paquier 3369a3b49b Fix bug in archive streamer with LZ4 decompression
When decompressing some input data, the calculation for the initial
starting point and the initial size were incorrect, potentially leading
to failures when decompressing contents with LZ4.  These initialization
points are fixed in this commit, bringing the logic closer to what
exists for gzip and zstd.

The contents of the compressed data is clear (for example backups taken
with LZ4 can still be decompressed with a "lz4" command), only the
decompression part reading the input data was impacted by this issue.

This code path impacts pg_basebackup and pg_verifybackup, which can use
the LZ4 decompression routines with an archive streamer, or any tools
that try to use the archive streamers in src/fe_utils/.

The issue is easier to reproduce with files that have a low-compression
rate, like ones filled with random data, for a size of at least 512kB,
but this could happen with anything as long as it is stored in a data
folder.  Some tests are added based on this idea, with a file filled
with random bytes grabbed from the backend, written at the root of the
data folder.  This is proving good enough to reproduce the original
problem.

Author: Mikhail Gribkov <youzhick@gmail.com>
Discussion: https://postgr.es/m/CAMEv5_uQS1Hg6KCaEP2JkrTBbZ-nXQhxomWrhYQvbdzR-zy-wA@mail.gmail.com
Backpatch-through: 15
2025-07-02 13:48:36 +09:00

425 lines
12 KiB
C

/*-------------------------------------------------------------------------
*
* astreamer_lz4.c
*
* Archive streamers that deal with data compressed using lz4.
* astreamer_lz4_compressor applies lz4 compression to the input stream,
* and astreamer_lz4_decompressor does the reverse.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/fe_utils/astreamer_lz4.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#ifdef USE_LZ4
#include <lz4frame.h>
#endif
#include "common/logging.h"
#include "fe_utils/astreamer.h"
#ifdef USE_LZ4
typedef struct astreamer_lz4_frame
{
astreamer base;
LZ4F_compressionContext_t cctx;
LZ4F_decompressionContext_t dctx;
LZ4F_preferences_t prefs;
size_t bytes_written;
bool header_written;
} astreamer_lz4_frame;
static void astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_compressor_finalize(astreamer *streamer);
static void astreamer_lz4_compressor_free(astreamer *streamer);
static const astreamer_ops astreamer_lz4_compressor_ops = {
.content = astreamer_lz4_compressor_content,
.finalize = astreamer_lz4_compressor_finalize,
.free = astreamer_lz4_compressor_free
};
static void astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
static void astreamer_lz4_decompressor_free(astreamer *streamer);
static const astreamer_ops astreamer_lz4_decompressor_ops = {
.content = astreamer_lz4_decompressor_content,
.finalize = astreamer_lz4_decompressor_finalize,
.free = astreamer_lz4_decompressor_free
};
#endif
/*
* Create a new base backup streamer that performs lz4 compression of tar
* blocks.
*/
astreamer *
astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_LZ4
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
LZ4F_preferences_t *prefs;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
streamer->header_written = false;
/* Initialize stream compression preferences */
prefs = &streamer->prefs;
memset(prefs, 0, sizeof(LZ4F_preferences_t));
prefs->frameInfo.blockSizeID = LZ4F_max256KB;
prefs->compressionLevel = compress->level;
ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
if (LZ4F_isError(ctxError))
pg_log_error("could not create lz4 compression context: %s",
LZ4F_getErrorName(ctxError));
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "LZ4");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_LZ4
/*
* Compress the input data to output buffer.
*
* Find out the compression bound based on input data length for each
* invocation to make sure that output buffer has enough capacity to
* accommodate the compressed data. In case if the output buffer
* capacity falls short of compression bound then forward the content
* of output buffer to next streamer and empty the buffer.
*/
static void
astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t out_bound,
compressed_size,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
/* Write header before processing the first input chunk. */
if (!mystreamer->header_written)
{
compressed_size = LZ4F_compressBegin(mystreamer->cctx,
(uint8 *) mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
&mystreamer->prefs);
if (LZ4F_isError(compressed_size))
pg_log_error("could not write lz4 header: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
mystreamer->header_written = true;
}
/*
* Update the offset and capacity of output buffer based on number of
* bytes written to output buffer.
*/
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
/*
* Find out the compression bound and make sure that output buffer has the
* required capacity for the success of LZ4F_compressUpdate. If needed
* forward the content to next streamer and empty the buffer.
*/
out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
if (avail_out < out_bound)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
context);
/* Enlarge buffer if it falls short of out bound. */
if (mystreamer->base.bbs_buffer.maxlen < out_bound)
enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
/*
* This call compresses the data starting at next_in and generates the
* output starting at next_out. It expects the caller to provide the size
* of input buffer and capacity of output buffer by providing parameters
* len and avail_out.
*
* It returns the number of bytes compressed to output buffer.
*/
compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
next_out, avail_out,
next_in, len, NULL);
if (LZ4F_isError(compressed_size))
pg_log_error("could not compress data: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
}
/*
* End-of-stream processing.
*/
static void
astreamer_lz4_compressor_finalize(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_out;
size_t footer_bound,
compressed_size,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
/* Find out the footer bound and update the output buffer. */
footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
footer_bound)
{
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
/* Enlarge buffer if it falls short of footer bound. */
if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
else
{
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
}
/*
* Finalize the frame and flush whatever data remaining in compression
* context.
*/
compressed_size = LZ4F_compressEnd(mystreamer->cctx,
next_out, avail_out, NULL);
if (LZ4F_isError(compressed_size))
pg_log_error("could not end lz4 compression: %s",
LZ4F_getErrorName(compressed_size));
mystreamer->bytes_written += compressed_size;
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_lz4_compressor_free(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeCompressionContext(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif
/*
* Create a new base backup streamer that performs decompression of lz4
* compressed blocks.
*/
astreamer *
astreamer_lz4_decompressor_new(astreamer *next)
{
#ifdef USE_LZ4
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
Assert(next != NULL);
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
/* Initialize internal stream state for decompression */
ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
if (LZ4F_isError(ctxError))
pg_fatal("could not initialize compression library: %s",
LZ4F_getErrorName(ctxError));
return &streamer->base;
#else
pg_fatal("this build does not support compression with %s", "LZ4");
return NULL; /* keep compiler quiet */
#endif
}
#ifdef USE_LZ4
/*
* Decompress the input data to output buffer until we run out of input
* data. Each time the output buffer is full, pass on the decompressed data
* to the next streamer.
*/
static void
astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t avail_in,
avail_out;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
avail_in = len;
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
while (avail_in > 0)
{
size_t ret,
read_size,
out_size;
read_size = avail_in;
out_size = avail_out;
/*
* This call decompresses the data starting at next_in and generates
* the output data starting at next_out. It expects the caller to
* provide size of the input buffer and total capacity of the output
* buffer by providing the read_size and out_size parameters
* respectively.
*
* Per the documentation of LZ4, parameters read_size and out_size
* behaves as dual parameters. On return, the number of bytes consumed
* from the input buffer will be written back to read_size and the
* number of bytes decompressed to output buffer will be written back
* to out_size respectively.
*/
ret = LZ4F_decompress(mystreamer->dctx,
next_out, &out_size,
next_in, &read_size, NULL);
if (LZ4F_isError(ret))
pg_log_error("could not decompress data: %s",
LZ4F_getErrorName(ret));
/* Update input buffer based on number of bytes consumed */
avail_in -= read_size;
next_in += read_size;
mystreamer->bytes_written += out_size;
/*
* If output buffer is full then forward the content to next streamer
* and update the output buffer.
*/
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
context);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
}
else
{
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
next_out += mystreamer->bytes_written;
}
}
}
/*
* End-of-stream processing.
*/
static void
astreamer_lz4_decompressor_finalize(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
astreamer_lz4_decompressor_free(astreamer *streamer)
{
astreamer_lz4_frame *mystreamer;
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeDecompressionContext(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
#endif