1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-15 05:46:52 +03:00

Introduce a generic pg_dump compression API

Switch pg_dump to use the Compression API, implemented by bf9aa490db.

The CompressFileHandle replaces the cfp* family of functions with a
struct of callbacks for accessing (compressed) files. This allows adding
new compression methods simply by introducing a new struct instance with
appropriate implementation of the callbacks.

Archives compressed using custom compression methods store an identifier
of the compression algorithm in their header instead of the compression
level. The header version is bumped.

Author: Georgios Kokolatos
Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Tomas Vondra
Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com
This commit is contained in:
Tomas Vondra
2023-02-23 18:33:30 +01:00
parent 739f1d6218
commit e9960732a9
16 changed files with 1090 additions and 791 deletions

View File

@@ -95,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH);
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
static void SetOutput(ArchiveHandle *AH, const char *filename,
const pg_compress_specification compression_spec);
static cfp *SaveOutput(ArchiveHandle *AH);
static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput);
static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
static void restore_toc_entries_prefork(ArchiveHandle *AH,
@@ -272,7 +272,7 @@ CloseArchive(Archive *AHX)
/* Close the output */
errno = 0;
res = cfclose(AH->OF);
res = EndCompressFileHandle(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
@@ -354,7 +354,7 @@ RestoreArchive(Archive *AHX)
RestoreOptions *ropt = AH->public.ropt;
bool parallel_mode;
TocEntry *te;
cfp *sav;
CompressFileHandle *sav;
AH->stage = STAGE_INITIALIZING;
@@ -1128,7 +1128,7 @@ PrintTOCSummary(Archive *AHX)
TocEntry *te;
pg_compress_specification out_compression_spec = {0};
teSection curSection;
cfp *sav;
CompressFileHandle *sav;
const char *fmtName;
char stamp_str[64];
@@ -1144,9 +1144,10 @@ PrintTOCSummary(Archive *AHX)
strcpy(stamp_str, "[unknown]");
ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
sanitize_line(AH->archdbname, false),
AH->tocCount, AH->compression_spec.level);
AH->tocCount,
get_compress_algorithm_name(AH->compression_spec.algorithm));
switch (AH->format)
{
@@ -1503,6 +1504,7 @@ static void
SetOutput(ArchiveHandle *AH, const char *filename,
const pg_compress_specification compression_spec)
{
CompressFileHandle *CFH;
const char *mode;
int fn = -1;
@@ -1525,33 +1527,32 @@ SetOutput(ArchiveHandle *AH, const char *filename,
else
mode = PG_BINARY_W;
if (fn >= 0)
AH->OF = cfdopen(dup(fn), mode, compression_spec);
else
AH->OF = cfopen(filename, mode, compression_spec);
CFH = InitCompressFileHandle(compression_spec);
if (!AH->OF)
if (CFH->open_func(filename, fn, mode, CFH))
{
if (filename)
pg_fatal("could not open output file \"%s\": %m", filename);
else
pg_fatal("could not open output file: %m");
}
AH->OF = CFH;
}
static cfp *
static CompressFileHandle *
SaveOutput(ArchiveHandle *AH)
{
return (cfp *) AH->OF;
return (CompressFileHandle *) AH->OF;
}
static void
RestoreOutput(ArchiveHandle *AH, cfp *savedOutput)
RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
{
int res;
errno = 0;
res = cfclose(AH->OF);
res = EndCompressFileHandle(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
@@ -1690,7 +1691,11 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
else if (RestoringToDB(AH))
bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
else
bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
{
CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
bytes_written = CFH->write_func(ptr, size * nmemb, CFH);
}
if (bytes_written != size * nmemb)
WRITE_ERROR_EXIT;
@@ -2032,6 +2037,18 @@ ReadStr(ArchiveHandle *AH)
return buf;
}
static bool
_fileExistsInDirectory(const char *dir, const char *filename)
{
struct stat st;
char buf[MAXPGPATH];
if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
pg_fatal("directory name too long: \"%s\"", dir);
return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
}
static int
_discoverArchiveFormat(ArchiveHandle *AH)
{
@@ -2062,26 +2079,12 @@ _discoverArchiveFormat(ArchiveHandle *AH)
*/
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
{
char buf[MAXPGPATH];
if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
pg_fatal("directory name too long: \"%s\"",
AH->fSpec);
if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
{
AH->format = archDirectory;
AH->format = archDirectory;
if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
return AH->format;
}
#ifdef HAVE_LIBZ
if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
pg_fatal("directory name too long: \"%s\"",
AH->fSpec);
if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
{
AH->format = archDirectory;
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
return AH->format;
}
#endif
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
AH->fSpec);
@@ -2179,6 +2182,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
SetupWorkerPtrType setupWorkerPtr)
{
ArchiveHandle *AH;
CompressFileHandle *CFH;
pg_compress_specification out_compress_spec = {0};
pg_log_debug("allocating AH for %s, format %d",
@@ -2234,7 +2238,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
/* Open stdout with no compression for AH output handle */
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
CFH = InitCompressFileHandle(out_compress_spec);
if (CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
pg_fatal("could not open stdout for appending: %m");
AH->OF = CFH;
/*
* On Windows, we need to use binary mode to read/write non-text files,
@@ -3646,12 +3653,7 @@ WriteHead(ArchiveHandle *AH)
AH->WriteBytePtr(AH, AH->intSize);
AH->WriteBytePtr(AH, AH->offSize);
AH->WriteBytePtr(AH, AH->format);
/*
* For now the compression type is implied by the level. This will need
* to change once support for more compression algorithms is added,
* requiring a format bump.
*/
WriteInt(AH, AH->compression_spec.level);
AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
crtm = *localtime(&AH->createDate);
WriteInt(AH, crtm.tm_sec);
WriteInt(AH, crtm.tm_min);
@@ -3723,10 +3725,11 @@ ReadHead(ArchiveHandle *AH)
pg_fatal("expected format (%d) differs from format found in file (%d)",
AH->format, fmt);
/* Guess the compression method based on the level */
AH->compression_spec.algorithm = PG_COMPRESSION_NONE;
if (AH->version >= K_VERS_1_2)
if (AH->version >= K_VERS_1_15)
AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
else if (AH->version >= K_VERS_1_2)
{
/* Guess the compression method based on the level */
if (AH->version < K_VERS_1_4)
AH->compression_spec.level = AH->ReadBytePtr(AH);
else