1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Provide for parallel restoration from a custom format archive. Each data and

post-data step is run in a separate worker child (a thread on Windows, a child
process elsewhere) up to the concurrent number specified by the new pg_restore
command-line --multi-thread | -m switch.

Andrew Dunstan, with some editing by Tom Lane.
This commit is contained in:
Andrew Dunstan
2009-02-02 20:07:37 +00:00
parent 3a5b773715
commit 775f1b379e
11 changed files with 1509 additions and 276 deletions

View File

@ -19,7 +19,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.40 2007/10/28 21:55:52 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.41 2009/02/02 20:07:37 adunstan Exp $
*
*-------------------------------------------------------------------------
*/
@ -40,6 +40,7 @@ static int _ReadByte(ArchiveHandle *);
static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
static void _CloseArchive(ArchiveHandle *AH);
static void _ReopenArchive(ArchiveHandle *AH);
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
@ -54,6 +55,8 @@ static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
/*------------
* Buffers used in zlib compression and extra data stored in archive and
@ -120,6 +123,7 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
AH->ReopenPtr = _ReopenArchive;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
@ -129,6 +133,8 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
AH->StartBlobPtr = _StartBlob;
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
/*
* Set up some special context used in compressing data.
@ -569,7 +575,6 @@ _PrintData(ArchiveHandle *AH)
zp->avail_in = blkLen;
#ifdef HAVE_LIBZ
if (AH->compression != 0)
{
while (zp->avail_in != 0)
@ -585,15 +590,12 @@ _PrintData(ArchiveHandle *AH)
}
}
else
{
#endif
{
in[zp->avail_in] = '\0';
ahwrite(in, 1, zp->avail_in, AH);
zp->avail_in = 0;
#ifdef HAVE_LIBZ
}
#endif
blkLen = ReadInt(AH);
}
@ -822,11 +824,9 @@ _CloseArchive(ArchiveHandle *AH)
* expect to be doing seeks to read the data back - it may be ok to
* just use the existing self-consistent block formatting.
*/
if (ctx->hasSeek)
{
fseeko(AH->FH, tpos, SEEK_SET);
if (ctx->hasSeek &&
fseeko(AH->FH, tpos, SEEK_SET) == 0)
WriteToc(AH);
}
}
if (fclose(AH->FH) != 0)
@ -835,6 +835,48 @@ _CloseArchive(ArchiveHandle *AH)
AH->FH = NULL;
}
/*
* Reopen the archive's file handle.
*
* We close the original file handle, except on Windows. (The difference
* is because on Windows, this is used within a multithreading context,
* and we don't want a thread closing the parent file handle.)
*/
static void
_ReopenArchive(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
pgoff_t tpos;
if (AH->mode == archModeWrite)
die_horribly(AH,modulename,"can only reopen input archives\n");
if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
die_horribly(AH,modulename,"cannot reopen stdin\n");
if (!ctx->hasSeek)
die_horribly(AH,modulename,"cannot reopen non-seekable file\n");
errno = 0;
tpos = ftello(AH->FH);
if (errno)
die_horribly(AH, modulename, "could not determine seek position in archive file: %s\n",
strerror(errno));
#ifndef WIN32
if (fclose(AH->FH) != 0)
die_horribly(AH, modulename, "could not close archive file: %s\n",
strerror(errno));
#endif
AH->FH = fopen(AH->fSpec, PG_BINARY_R);
if (!AH->FH)
die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
AH->fSpec, strerror(errno));
if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
die_horribly(AH, modulename, "could not set seek position in archive file: %s\n",
strerror(errno));
}
/*--------------------------------------------------
* END OF FORMAT CALLBACKS
*--------------------------------------------------
@ -990,7 +1032,6 @@ _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush)
/*
* Terminate zlib context and flush it's buffers. If no zlib
* then just return.
*
*/
static void
_EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
@ -1020,3 +1061,44 @@ _EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
/* Send the end marker */
WriteInt(AH, 0);
}
/*
* Clone format-specific fields during parallel restoration.
*/
static void
_Clone(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
AH->formatData = (lclContext *) malloc(sizeof(lclContext));
if (AH->formatData == NULL)
die_horribly(AH, modulename, "out of memory\n");
memcpy(AH->formatData, ctx, sizeof(lclContext));
ctx = (lclContext *) AH->formatData;
ctx->zp = (z_streamp) malloc(sizeof(z_stream));
ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
ctx->zlibIn = (char *) malloc(ctx->inSize);
if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL)
die_horribly(AH, modulename, "out of memory\n");
/*
* Note: we do not make a local lo_buf because we expect at most one
* BLOBS entry per archive, so no parallelism is possible. Likewise,
* TOC-entry-local state isn't an issue because any one TOC entry is
* touched by just one worker child.
*/
}
static void
_DeClone(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
free(ctx->zlibOut);
free(ctx->zlibIn);
free(ctx->zp);
free(ctx);
}