mirror of
https://github.com/postgres/postgres.git
synced 2025-05-28 05:21:27 +03:00
Prepare pg_dump internals for additional compression methods
Commit bf9aa490db introduced a compression API in compress_io.{c,h} to make reuse easier, and allow adding more compression algorithms. However, pg_backup_archiver.c was not switched to this API and continued to call the compression directly. This commit teaches pg_backup_archiver.c about the compression API, so that it can benefit from bf9aa490db (simpler code, easier addition of new compression methods). Author: Georgios Kokolatos Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Tomas Vondra Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com
This commit is contained in:
parent
009eeee746
commit
03d02f54a6
@ -56,6 +56,41 @@
|
||||
#include "compress_io.h"
|
||||
#include "pg_backup_utils.h"
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
/*----------------------
|
||||
* Generic functions
|
||||
*----------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Checks whether a compression algorithm is supported.
|
||||
*
|
||||
* On success returns NULL, otherwise returns a malloc'ed string which can be
|
||||
* used by the caller in an error message.
|
||||
*/
|
||||
char *
|
||||
supports_compression(const pg_compress_specification compression_spec)
|
||||
{
|
||||
const pg_compress_algorithm algorithm = compression_spec.algorithm;
|
||||
bool supported = false;
|
||||
|
||||
if (algorithm == PG_COMPRESSION_NONE)
|
||||
supported = true;
|
||||
#ifdef HAVE_LIBZ
|
||||
if (algorithm == PG_COMPRESSION_GZIP)
|
||||
supported = true;
|
||||
#endif
|
||||
|
||||
if (!supported)
|
||||
return psprintf("this build does not support compression with %s",
|
||||
get_compress_algorithm_name(algorithm));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*----------------------
|
||||
* Compressor API
|
||||
*----------------------
|
||||
@ -490,16 +525,19 @@ cfopen_write(const char *path, const char *mode,
|
||||
}
|
||||
|
||||
/*
|
||||
* Opens file 'path' in 'mode'. If compression is GZIP, the file
|
||||
* is opened with libz gzopen(), otherwise with plain fopen().
|
||||
* This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or
|
||||
* associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The
|
||||
* descriptor is not dup'ed and it is the caller's responsibility to do so.
|
||||
* The caller must verify that the 'compress_algorithm' is supported by the
|
||||
* current build.
|
||||
*
|
||||
* On failure, return NULL with an error code in errno.
|
||||
*/
|
||||
cfp *
|
||||
cfopen(const char *path, const char *mode,
|
||||
const pg_compress_specification compression_spec)
|
||||
static cfp *
|
||||
cfopen_internal(const char *path, int fd, const char *mode,
|
||||
pg_compress_specification compression_spec)
|
||||
{
|
||||
cfp *fp = pg_malloc(sizeof(cfp));
|
||||
cfp *fp = pg_malloc0(sizeof(cfp));
|
||||
|
||||
if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
{
|
||||
@ -511,15 +549,20 @@ cfopen(const char *path, const char *mode,
|
||||
|
||||
snprintf(mode_compression, sizeof(mode_compression), "%s%d",
|
||||
mode, compression_spec.level);
|
||||
fp->compressedfp = gzopen(path, mode_compression);
|
||||
if (fd >= 0)
|
||||
fp->compressedfp = gzdopen(fd, mode_compression);
|
||||
else
|
||||
fp->compressedfp = gzopen(path, mode_compression);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* don't specify a level, just use the zlib default */
|
||||
fp->compressedfp = gzopen(path, mode);
|
||||
if (fd >= 0)
|
||||
fp->compressedfp = gzdopen(fd, mode);
|
||||
else
|
||||
fp->compressedfp = gzopen(path, mode);
|
||||
}
|
||||
|
||||
fp->uncompressedfp = NULL;
|
||||
if (fp->compressedfp == NULL)
|
||||
{
|
||||
free_keep_errno(fp);
|
||||
@ -531,10 +574,11 @@ cfopen(const char *path, const char *mode,
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef HAVE_LIBZ
|
||||
fp->compressedfp = NULL;
|
||||
#endif
|
||||
fp->uncompressedfp = fopen(path, mode);
|
||||
if (fd >= 0)
|
||||
fp->uncompressedfp = fdopen(fd, mode);
|
||||
else
|
||||
fp->uncompressedfp = fopen(path, mode);
|
||||
|
||||
if (fp->uncompressedfp == NULL)
|
||||
{
|
||||
free_keep_errno(fp);
|
||||
@ -545,6 +589,33 @@ cfopen(const char *path, const char *mode,
|
||||
return fp;
|
||||
}
|
||||
|
||||
/*
|
||||
* Opens file 'path' in 'mode' and compression as defined in
|
||||
* compression_spec. The caller must verify that the compression
|
||||
* is supported by the current build.
|
||||
*
|
||||
* On failure, return NULL with an error code in errno.
|
||||
*/
|
||||
cfp *
|
||||
cfopen(const char *path, const char *mode,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
return cfopen_internal(path, -1, mode, compression_spec);
|
||||
}
|
||||
|
||||
/*
|
||||
* Associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'
|
||||
* and compression as defined in compression_spec. The caller must
|
||||
* verify that the compression is supported by the current build.
|
||||
*
|
||||
* On failure, return NULL with an error code in errno.
|
||||
*/
|
||||
cfp *
|
||||
cfdopen(int fd, const char *mode,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
return cfopen_internal(NULL, fd, mode, compression_spec);
|
||||
}
|
||||
|
||||
int
|
||||
cfread(void *ptr, int size, cfp *fp)
|
||||
|
@ -21,6 +21,8 @@
|
||||
#define ZLIB_OUT_SIZE 4096
|
||||
#define ZLIB_IN_SIZE 4096
|
||||
|
||||
extern char *supports_compression(const pg_compress_specification compression_spec);
|
||||
|
||||
/* Prototype for callback function to WriteDataToArchive() */
|
||||
typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
|
||||
|
||||
@ -54,6 +56,8 @@ typedef struct cfp cfp;
|
||||
|
||||
extern cfp *cfopen(const char *path, const char *mode,
|
||||
const pg_compress_specification compression_spec);
|
||||
extern cfp *cfdopen(int fd, const char *mode,
|
||||
const pg_compress_specification compression_spec);
|
||||
extern cfp *cfopen_read(const char *path, const char *mode);
|
||||
extern cfp *cfopen_write(const char *path, const char *mode,
|
||||
const pg_compress_specification compression_spec);
|
||||
|
@ -31,6 +31,7 @@
|
||||
#endif
|
||||
|
||||
#include "common/string.h"
|
||||
#include "compress_io.h"
|
||||
#include "dumputils.h"
|
||||
#include "fe_utils/string_utils.h"
|
||||
#include "lib/stringinfo.h"
|
||||
@ -43,13 +44,6 @@
|
||||
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
|
||||
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
|
||||
|
||||
/* state needed to save/restore an archive's output target */
|
||||
typedef struct _outputContext
|
||||
{
|
||||
void *OF;
|
||||
int gzOut;
|
||||
} OutputContext;
|
||||
|
||||
/*
|
||||
* State for tracking TocEntrys that are ready to process during a parallel
|
||||
* restore. (This used to be a list, and we still call it that, though now
|
||||
@ -101,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH);
|
||||
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
|
||||
static void SetOutput(ArchiveHandle *AH, const char *filename,
|
||||
const pg_compress_specification compression_spec);
|
||||
static OutputContext SaveOutput(ArchiveHandle *AH);
|
||||
static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
|
||||
static cfp *SaveOutput(ArchiveHandle *AH);
|
||||
static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput);
|
||||
|
||||
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
|
||||
static void restore_toc_entries_prefork(ArchiveHandle *AH,
|
||||
@ -277,11 +271,8 @@ CloseArchive(Archive *AHX)
|
||||
AH->ClosePtr(AH);
|
||||
|
||||
/* Close the output */
|
||||
errno = 0; /* in case gzclose() doesn't set it */
|
||||
if (AH->gzOut)
|
||||
res = GZCLOSE(AH->OF);
|
||||
else if (AH->OF != stdout)
|
||||
res = fclose(AH->OF);
|
||||
errno = 0;
|
||||
res = cfclose(AH->OF);
|
||||
|
||||
if (res != 0)
|
||||
pg_fatal("could not close output file: %m");
|
||||
@ -363,7 +354,7 @@ RestoreArchive(Archive *AHX)
|
||||
RestoreOptions *ropt = AH->public.ropt;
|
||||
bool parallel_mode;
|
||||
TocEntry *te;
|
||||
OutputContext sav;
|
||||
cfp *sav;
|
||||
|
||||
AH->stage = STAGE_INITIALIZING;
|
||||
|
||||
@ -391,17 +382,21 @@ RestoreArchive(Archive *AHX)
|
||||
/*
|
||||
* Make sure we won't need (de)compression we haven't got
|
||||
*/
|
||||
#ifndef HAVE_LIBZ
|
||||
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP &&
|
||||
AH->PrintTocDataPtr != NULL)
|
||||
if (AH->PrintTocDataPtr != NULL)
|
||||
{
|
||||
for (te = AH->toc->next; te != AH->toc; te = te->next)
|
||||
{
|
||||
if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
|
||||
pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
|
||||
{
|
||||
char *errmsg = supports_compression(AH->compression_spec);
|
||||
if (errmsg)
|
||||
pg_fatal("cannot restore from compressed archive (%s)",
|
||||
errmsg);
|
||||
else
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Prepare index arrays, so we can assume we have them throughout restore.
|
||||
@ -1133,7 +1128,7 @@ PrintTOCSummary(Archive *AHX)
|
||||
TocEntry *te;
|
||||
pg_compress_specification out_compression_spec = {0};
|
||||
teSection curSection;
|
||||
OutputContext sav;
|
||||
cfp *sav;
|
||||
const char *fmtName;
|
||||
char stamp_str[64];
|
||||
|
||||
@ -1508,58 +1503,32 @@ static void
|
||||
SetOutput(ArchiveHandle *AH, const char *filename,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
int fn;
|
||||
const char *mode;
|
||||
int fn = -1;
|
||||
|
||||
if (filename)
|
||||
{
|
||||
if (strcmp(filename, "-") == 0)
|
||||
fn = fileno(stdout);
|
||||
else
|
||||
fn = -1;
|
||||
}
|
||||
else if (AH->FH)
|
||||
fn = fileno(AH->FH);
|
||||
else if (AH->fSpec)
|
||||
{
|
||||
fn = -1;
|
||||
filename = AH->fSpec;
|
||||
}
|
||||
else
|
||||
fn = fileno(stdout);
|
||||
|
||||
/* If compression explicitly requested, use gzopen */
|
||||
#ifdef HAVE_LIBZ
|
||||
if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
{
|
||||
char fmode[14];
|
||||
|
||||
/* Don't use PG_BINARY_x since this is zlib */
|
||||
sprintf(fmode, "wb%d", compression_spec.level);
|
||||
if (fn >= 0)
|
||||
AH->OF = gzdopen(dup(fn), fmode);
|
||||
else
|
||||
AH->OF = gzopen(filename, fmode);
|
||||
AH->gzOut = 1;
|
||||
}
|
||||
if (AH->mode == archModeAppend)
|
||||
mode = PG_BINARY_A;
|
||||
else
|
||||
#endif
|
||||
{ /* Use fopen */
|
||||
if (AH->mode == archModeAppend)
|
||||
{
|
||||
if (fn >= 0)
|
||||
AH->OF = fdopen(dup(fn), PG_BINARY_A);
|
||||
else
|
||||
AH->OF = fopen(filename, PG_BINARY_A);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fn >= 0)
|
||||
AH->OF = fdopen(dup(fn), PG_BINARY_W);
|
||||
else
|
||||
AH->OF = fopen(filename, PG_BINARY_W);
|
||||
}
|
||||
AH->gzOut = 0;
|
||||
}
|
||||
mode = PG_BINARY_W;
|
||||
|
||||
if (fn >= 0)
|
||||
AH->OF = cfdopen(dup(fn), mode, compression_spec);
|
||||
else
|
||||
AH->OF = cfopen(filename, mode, compression_spec);
|
||||
|
||||
if (!AH->OF)
|
||||
{
|
||||
@ -1570,33 +1539,24 @@ SetOutput(ArchiveHandle *AH, const char *filename,
|
||||
}
|
||||
}
|
||||
|
||||
static OutputContext
|
||||
static cfp *
|
||||
SaveOutput(ArchiveHandle *AH)
|
||||
{
|
||||
OutputContext sav;
|
||||
|
||||
sav.OF = AH->OF;
|
||||
sav.gzOut = AH->gzOut;
|
||||
|
||||
return sav;
|
||||
return (cfp *) AH->OF;
|
||||
}
|
||||
|
||||
static void
|
||||
RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
|
||||
RestoreOutput(ArchiveHandle *AH, cfp *savedOutput)
|
||||
{
|
||||
int res;
|
||||
|
||||
errno = 0; /* in case gzclose() doesn't set it */
|
||||
if (AH->gzOut)
|
||||
res = GZCLOSE(AH->OF);
|
||||
else
|
||||
res = fclose(AH->OF);
|
||||
errno = 0;
|
||||
res = cfclose(AH->OF);
|
||||
|
||||
if (res != 0)
|
||||
pg_fatal("could not close output file: %m");
|
||||
|
||||
AH->gzOut = savedContext.gzOut;
|
||||
AH->OF = savedContext.OF;
|
||||
AH->OF = savedOutput;
|
||||
}
|
||||
|
||||
|
||||
@ -1720,22 +1680,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
|
||||
|
||||
bytes_written = size * nmemb;
|
||||
}
|
||||
else if (AH->gzOut)
|
||||
bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
|
||||
else if (AH->CustomOutPtr)
|
||||
bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
|
||||
|
||||
/*
|
||||
* If we're doing a restore, and it's direct to DB, and we're connected
|
||||
* then send it to the DB.
|
||||
*/
|
||||
else if (RestoringToDB(AH))
|
||||
bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
|
||||
else
|
||||
{
|
||||
/*
|
||||
* If we're doing a restore, and it's direct to DB, and we're
|
||||
* connected then send it to the DB.
|
||||
*/
|
||||
if (RestoringToDB(AH))
|
||||
bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
|
||||
else
|
||||
bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
|
||||
}
|
||||
bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
|
||||
|
||||
if (bytes_written != size * nmemb)
|
||||
WRITE_ERROR_EXIT;
|
||||
@ -2224,6 +2179,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
|
||||
SetupWorkerPtrType setupWorkerPtr)
|
||||
{
|
||||
ArchiveHandle *AH;
|
||||
pg_compress_specification out_compress_spec = {0};
|
||||
|
||||
pg_log_debug("allocating AH for %s, format %d",
|
||||
FileSpec ? FileSpec : "(stdio)", fmt);
|
||||
@ -2277,8 +2233,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
|
||||
memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
|
||||
|
||||
/* Open stdout with no compression for AH output handle */
|
||||
AH->gzOut = 0;
|
||||
AH->OF = stdout;
|
||||
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
|
||||
AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
|
||||
|
||||
/*
|
||||
* On Windows, we need to use binary mode to read/write non-text files,
|
||||
@ -3712,6 +3668,7 @@ WriteHead(ArchiveHandle *AH)
|
||||
void
|
||||
ReadHead(ArchiveHandle *AH)
|
||||
{
|
||||
char *errmsg;
|
||||
char vmaj,
|
||||
vmin,
|
||||
vrev;
|
||||
@ -3781,10 +3738,13 @@ ReadHead(ArchiveHandle *AH)
|
||||
else
|
||||
AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
|
||||
|
||||
#ifndef HAVE_LIBZ
|
||||
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
|
||||
#endif
|
||||
errmsg = supports_compression(AH->compression_spec);
|
||||
if (errmsg)
|
||||
{
|
||||
pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
|
||||
errmsg);
|
||||
pg_free(errmsg);
|
||||
}
|
||||
|
||||
if (AH->version >= K_VERS_1_4)
|
||||
{
|
||||
|
@ -32,30 +32,6 @@
|
||||
|
||||
#define LOBBUFSIZE 16384
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
#include <zlib.h>
|
||||
#define GZCLOSE(fh) gzclose(fh)
|
||||
#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s))
|
||||
#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s))
|
||||
#define GZEOF(fh) gzeof(fh)
|
||||
#else
|
||||
#define GZCLOSE(fh) fclose(fh)
|
||||
#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
|
||||
#define GZREAD(p, s, n, fh) fread(p, s, n, fh)
|
||||
#define GZEOF(fh) feof(fh)
|
||||
/* this is just the redefinition of a libz constant */
|
||||
#define Z_DEFAULT_COMPRESSION (-1)
|
||||
|
||||
typedef struct _z_stream
|
||||
{
|
||||
void *next_in;
|
||||
void *next_out;
|
||||
size_t avail_in;
|
||||
size_t avail_out;
|
||||
} z_stream;
|
||||
typedef z_stream *z_streamp;
|
||||
#endif
|
||||
|
||||
/* Data block types */
|
||||
#define BLK_DATA 1
|
||||
#define BLK_BLOBS 3
|
||||
@ -319,8 +295,7 @@ struct _archiveHandle
|
||||
|
||||
char *fSpec; /* Archive File Spec */
|
||||
FILE *FH; /* General purpose file handle */
|
||||
void *OF;
|
||||
int gzOut; /* Output file */
|
||||
void *OF; /* Output file */
|
||||
|
||||
struct _tocEntry *toc; /* Header of circular list of TOC entries */
|
||||
int tocCount; /* Number of TOC entries */
|
||||
|
Loading…
x
Reference in New Issue
Block a user