1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add parallel pg_dump option.

New infrastructure is added which creates a set number of workers
(threads on Windows, forked processes on Unix). Jobs are then
handed out to these workers by the master process as needed.
pg_restore is adjusted to use this new infrastructure in place of the
old setup which created a new worker for each step on the fly. Parallel
dumps acquire a snapshot clone in order to stay consistent, if
available.

The parallel option is selected by the -j / --jobs command line
parameter of pg_dump.

Joachim Wieland, lightly editorialized by Andrew Dunstan.
This commit is contained in:
Andrew Dunstan
2013-03-24 11:27:20 -04:00
parent 3b91fe185a
commit 9e257a181c
22 changed files with 2776 additions and 830 deletions

View File

@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
pg_backup_null.o pg_backup_tar.o \
pg_backup_null.o pg_backup_tar.o parallel.o \
pg_backup_directory.o dumputils.o compress_io.o $(WIN32RES)
KEYWRDOBJS = keywords.o kwlookup.o

View File

@ -54,6 +54,7 @@
#include "compress_io.h"
#include "dumputils.h"
#include "parallel.h"
/*----------------------
* Compressor API
@ -182,6 +183,9 @@ size_t
WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
/* Are we aborting? */
checkAborting(AH);
switch (cs->comprAlg)
{
case COMPR_ALG_LIBZ:
@ -351,6 +355,9 @@ ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
/* no minimal chunk size for zlib */
while ((cnt = readF(AH, &buf, &buflen)))
{
/* Are we aborting? */
checkAborting(AH);
zp->next_in = (void *) buf;
zp->avail_in = cnt;
@ -411,6 +418,9 @@ ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
while ((cnt = readF(AH, &buf, &buflen)))
{
/* Are we aborting? */
checkAborting(AH);
ahwrite(buf, 1, cnt, AH);
}

View File

@ -38,6 +38,7 @@ static struct
} on_exit_nicely_list[MAX_ON_EXIT_NICELY];
static int on_exit_nicely_index;
void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap) = vwrite_msg;
#define supports_grant_options(version) ((version) >= 70400)
@ -48,11 +49,21 @@ static bool parseAclItem(const char *item, const char *type,
static char *copyAclUserName(PQExpBuffer output, char *input);
static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
const char *subname);
static PQExpBuffer getThreadLocalPQExpBuffer(void);
#ifdef WIN32
static void shutdown_parallel_dump_utils(int code, void *unused);
static bool parallel_init_done = false;
static DWORD tls_index;
static DWORD mainThreadId;
static void
shutdown_parallel_dump_utils(int code, void *unused)
{
/* Call the cleanup function only from the main thread */
if (mainThreadId == GetCurrentThreadId())
WSACleanup();
}
#endif
void
@ -61,23 +72,29 @@ init_parallel_dump_utils(void)
#ifdef WIN32
if (!parallel_init_done)
{
WSADATA wsaData;
int err;
tls_index = TlsAlloc();
parallel_init_done = true;
mainThreadId = GetCurrentThreadId();
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (err != 0)
{
fprintf(stderr, _("WSAStartup failed: %d\n"), err);
exit_nicely(1);
}
on_exit_nicely(shutdown_parallel_dump_utils, NULL);
parallel_init_done = true;
}
#endif
}
/*
* Quotes input string if it's not a legitimate SQL identifier as-is.
*
* Note that the returned string must be used before calling fmtId again,
* since we re-use the same return buffer each time. Non-reentrant but
* reduces memory leakage. (On Windows the memory leakage will be one buffer
* per thread, which is at least better than one per call).
* Non-reentrant but reduces memory leakage. (On Windows the memory leakage
* will be one buffer per thread, which is at least better than one per call).
*/
const char *
fmtId(const char *rawid)
static PQExpBuffer
getThreadLocalPQExpBuffer(void)
{
/*
* The Tls code goes awry if we use a static var, so we provide for both
@ -86,9 +103,6 @@ fmtId(const char *rawid)
static PQExpBuffer s_id_return = NULL;
PQExpBuffer id_return;
const char *cp;
bool need_quotes = false;
#ifdef WIN32
if (parallel_init_done)
id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */
@ -118,6 +132,23 @@ fmtId(const char *rawid)
}
return id_return;
}
/*
* Quotes input string if it's not a legitimate SQL identifier as-is.
*
* Note that the returned string must be used before calling fmtId again,
* since we re-use the same return buffer each time.
*/
const char *
fmtId(const char *rawid)
{
PQExpBuffer id_return = getThreadLocalPQExpBuffer();
const char *cp;
bool need_quotes = false;
/*
* These checks need to match the identifier production in scan.l. Don't
* use islower() etc.
@ -185,6 +216,35 @@ fmtId(const char *rawid)
return id_return->data;
}
/*
* fmtQualifiedId - convert a qualified name to the proper format for
* the source database.
*
* Like fmtId, use the result before calling again.
*
* Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot
* use it until we're finished with calling fmtId().
*/
const char *
fmtQualifiedId(int remoteVersion, const char *schema, const char *id)
{
PQExpBuffer id_return;
PQExpBuffer lcl_pqexp = createPQExpBuffer();
/* Suppress schema name if fetching from pre-7.3 DB */
if (remoteVersion >= 70300 && schema && *schema)
{
appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema));
}
appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id));
id_return = getThreadLocalPQExpBuffer();
appendPQExpBuffer(id_return, "%s", lcl_pqexp->data);
destroyPQExpBuffer(lcl_pqexp);
return id_return->data;
}
/*
* Convert a string value to an SQL string literal and append it to
@ -1315,7 +1375,7 @@ exit_horribly(const char *modulename, const char *fmt,...)
va_list ap;
va_start(ap, fmt);
vwrite_msg(modulename, fmt, ap);
on_exit_msg_func(modulename, fmt, ap);
va_end(ap);
exit_nicely(1);

View File

@ -29,14 +29,14 @@ typedef enum /* bits returned by set_dump_section */
typedef struct SimpleStringListCell
{
struct SimpleStringListCell *next;
char val[1]; /* VARIABLE LENGTH FIELD */
struct SimpleStringListCell *next;
char val[1]; /* VARIABLE LENGTH FIELD */
} SimpleStringListCell;
typedef struct SimpleStringList
{
SimpleStringListCell *head;
SimpleStringListCell *tail;
SimpleStringListCell *head;
SimpleStringListCell *tail;
} SimpleStringList;
@ -47,6 +47,8 @@ extern const char *progname;
extern void init_parallel_dump_utils(void);
extern const char *fmtId(const char *identifier);
extern const char *fmtQualifiedId(int remoteVersion,
const char *schema, const char *id);
extern void appendStringLiteral(PQExpBuffer buf, const char *str,
int encoding, bool std_strings);
extern void appendStringLiteralConn(PQExpBuffer buf, const char *str,
@ -85,11 +87,12 @@ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
extern void
exit_horribly(const char *modulename, const char *fmt,...)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn));
extern void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
extern void on_exit_nicely(on_exit_nicely_callback function, void *arg);
extern void exit_nicely(int code) __attribute__((noreturn));
extern void simple_string_list_append(SimpleStringList *list, const char *val);
extern bool simple_string_list_member(SimpleStringList *list, const char *val);
#endif /* DUMPUTILS_H */

1293
src/bin/pg_dump/parallel.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,85 @@
/*-------------------------------------------------------------------------
*
* parallel.h
*
* Parallel support header file for the pg_dump archiver
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* The author is not responsible for loss or damages that may
* result from its use.
*
* IDENTIFICATION
* src/bin/pg_dump/parallel.h
*
*-------------------------------------------------------------------------
*/
#include "pg_backup_db.h"
struct _archiveHandle;
struct _tocEntry;
typedef enum
{
WRKR_TERMINATED = 0,
WRKR_IDLE,
WRKR_WORKING,
WRKR_FINISHED
} T_WorkerStatus;
typedef enum T_Action
{
ACT_DUMP,
ACT_RESTORE,
} T_Action;
/* Arguments needed for a worker process */
typedef struct ParallelArgs
{
struct _archiveHandle *AH;
struct _tocEntry *te;
} ParallelArgs;
/* State for each parallel activity slot */
typedef struct ParallelSlot
{
ParallelArgs *args;
T_WorkerStatus workerStatus;
int status;
int pipeRead;
int pipeWrite;
int pipeRevRead;
int pipeRevWrite;
#ifdef WIN32
uintptr_t hThread;
unsigned int threadId;
#else
pid_t pid;
#endif
} ParallelSlot;
#define NO_SLOT (-1)
typedef struct ParallelState
{
int numWorkers;
ParallelSlot *parallelSlot;
} ParallelState;
extern int GetIdleWorker(ParallelState *pstate);
extern bool IsEveryWorkerIdle(ParallelState *pstate);
extern void ListenToWorkers(struct _archiveHandle * AH, ParallelState *pstate, bool do_wait);
extern int ReapWorkerStatus(ParallelState *pstate, int *status);
extern void EnsureIdleWorker(struct _archiveHandle * AH, ParallelState *pstate);
extern void EnsureWorkersFinished(struct _archiveHandle * AH, ParallelState *pstate);
extern ParallelState *ParallelBackupStart(struct _archiveHandle * AH,
RestoreOptions *ropt);
extern void DispatchJobForTocEntry(struct _archiveHandle * AH,
ParallelState *pstate,
struct _tocEntry * te, T_Action act);
extern void ParallelBackupEnd(struct _archiveHandle * AH, ParallelState *pstate);
extern void checkAborting(struct _archiveHandle * AH);

View File

@ -82,9 +82,14 @@ struct Archive
int minRemoteVersion; /* allowable range */
int maxRemoteVersion;
int numWorkers; /* number of parallel processes */
char *sync_snapshot_id; /* sync snapshot id for parallel
* operation */
/* info needed for string escaping */
int encoding; /* libpq code for client_encoding */
bool std_strings; /* standard_conforming_strings */
char *use_role; /* Issue SET ROLE to this */
/* error handling */
bool exit_on_error; /* whether to exit on SQL errors... */
@ -142,11 +147,12 @@ typedef struct _restoreOptions
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
int number_of_jobs;
bool *idWanted; /* array showing which dump IDs to emit */
} RestoreOptions;
typedef void (*SetupWorkerPtr) (Archive *AH, RestoreOptions *ropt);
/*
* Main archiver interface.
*/
@ -189,7 +195,8 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const int compression, ArchiveMode mode);
const int compression, ArchiveMode mode,
SetupWorkerPtr setupDumpWorker);
/* The --list option */
extern void PrintTOCSummary(Archive *AH, RestoreOptions *ropt);

File diff suppressed because it is too large Load Diff

View File

@ -100,8 +100,21 @@ typedef z_stream *z_streamp;
#define K_OFFSET_POS_SET 2
#define K_OFFSET_NO_DATA 3
/*
* Special exit values from worker children. We reserve 0 for normal
* success; 1 and other small values should be interpreted as crashes.
*/
#define WORKER_OK 0
#define WORKER_CREATE_DONE 10
#define WORKER_INHIBIT_DATA 11
#define WORKER_IGNORED_ERRORS 12
struct _archiveHandle;
struct _tocEntry;
struct _restoreList;
struct ParallelArgs;
struct ParallelState;
enum T_Action;
typedef void (*ClosePtr) (struct _archiveHandle * AH);
typedef void (*ReopenPtr) (struct _archiveHandle * AH);
@ -129,6 +142,13 @@ typedef void (*PrintTocDataPtr) (struct _archiveHandle * AH, struct _tocEntry *
typedef void (*ClonePtr) (struct _archiveHandle * AH);
typedef void (*DeClonePtr) (struct _archiveHandle * AH);
typedef char *(*WorkerJobRestorePtr) (struct _archiveHandle * AH, struct _tocEntry * te);
typedef char *(*WorkerJobDumpPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
typedef char *(*MasterStartParallelItemPtr) (struct _archiveHandle * AH, struct _tocEntry * te,
enum T_Action act);
typedef int (*MasterEndParallelItemPtr) (struct _archiveHandle * AH, struct _tocEntry * te,
const char *str, enum T_Action act);
typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
typedef enum
@ -227,6 +247,13 @@ typedef struct _archiveHandle
StartBlobPtr StartBlobPtr;
EndBlobPtr EndBlobPtr;
MasterStartParallelItemPtr MasterStartParallelItemPtr;
MasterEndParallelItemPtr MasterEndParallelItemPtr;
SetupWorkerPtr SetupWorkerPtr;
WorkerJobDumpPtr WorkerJobDumpPtr;
WorkerJobRestorePtr WorkerJobRestorePtr;
ClonePtr ClonePtr; /* Clone format-specific fields */
DeClonePtr DeClonePtr; /* Clean up cloned fields */
@ -236,6 +263,7 @@ typedef struct _archiveHandle
char *archdbname; /* DB name *read* from archive */
enum trivalue promptPassword;
char *savedPassword; /* password for ropt->username, if known */
char *use_role;
PGconn *connection;
int connectToDB; /* Flag to indicate if direct DB connection is
* required */
@ -327,6 +355,7 @@ typedef struct _tocEntry
int nLockDeps; /* number of such dependencies */
} TocEntry;
extern int parallel_restore(struct ParallelArgs * args);
extern void on_exit_close_archive(Archive *AHX);
extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
@ -337,9 +366,13 @@ extern void WriteHead(ArchiveHandle *AH);
extern void ReadHead(ArchiveHandle *AH);
extern void WriteToc(ArchiveHandle *AH);
extern void ReadToc(ArchiveHandle *AH);
extern void WriteDataChunks(ArchiveHandle *AH);
extern void WriteDataChunks(ArchiveHandle *AH, struct ParallelState *pstate);
extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
extern void DeCloneArchive(ArchiveHandle *AH);
extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id);
TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
extern bool checkSeek(FILE *fp);
#define appendStringLiteralAHX(buf,str,AH) \

View File

@ -26,6 +26,7 @@
#include "compress_io.h"
#include "dumputils.h"
#include "parallel.h"
/*--------
* Routines in the format interface
@ -59,6 +60,10 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
typedef struct
{
CompressorState *cs;
@ -127,6 +132,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
/* no parallel dump in the custom archive, only parallel restore */
AH->WorkerJobDumpPtr = NULL;
AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
/* Set up a private area. */
ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
AH->formatData = (void *) ctx;
@ -698,7 +710,7 @@ _CloseArchive(ArchiveHandle *AH)
tpos = ftello(AH->FH);
WriteToc(AH);
ctx->dataStart = _getFilePos(AH, ctx);
WriteDataChunks(AH);
WriteDataChunks(AH, NULL);
/*
* If possible, re-write the TOC in order to update the data offset
@ -796,6 +808,80 @@ _DeClone(ArchiveHandle *AH)
free(ctx);
}
/*
* This function is executed in the child of a parallel backup for the
* custom format archive and dumps the actual data.
*/
char *
_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
{
/*
* short fixed-size string + some ID so far, this needs to be malloc'ed
* instead of static because we work with threads on windows
*/
const int buflen = 64;
char *buf = (char *) pg_malloc(buflen);
ParallelArgs pargs;
int status;
pargs.AH = AH;
pargs.te = te;
status = parallel_restore(&pargs);
snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
return buf;
}
/*
* This function is executed in the parent process. Depending on the desired
* action (dump or restore) it creates a string that is understood by the
* _WorkerJobDump /_WorkerJobRestore functions of the dump format.
*/
static char *
_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
{
/*
* A static char is okay here, even on Windows because we call this
* function only from one process (the master).
*/
static char buf[64]; /* short fixed-size string + number */
/* no parallel dump in the custom archive format */
Assert(act == ACT_RESTORE);
snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
return buf;
}
/*
* This function is executed in the parent process. It analyzes the response of
* the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
*/
static int
_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
{
DumpId dumpId;
int nBytes,
status,
n_errors;
/* no parallel dump in the custom archive */
Assert(act == ACT_RESTORE);
sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
Assert(nBytes == strlen(str));
Assert(dumpId == te->dumpId);
AH->public.n_errors += n_errors;
return status;
}
/*--------------------------------------------------
* END OF FORMAT CALLBACKS
*--------------------------------------------------

View File

@ -309,12 +309,30 @@ ConnectDatabase(Archive *AHX,
PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
}
/*
* Close the connection to the database and also cancel off the query if we
* have one running.
*/
void
DisconnectDatabase(Archive *AHX)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
PGcancel *cancel;
char errbuf[1];
PQfinish(AH->connection); /* noop if AH->connection is NULL */
if (!AH->connection)
return;
if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
{
if ((cancel = PQgetCancel(AH->connection)))
{
PQcancel(cancel, errbuf, sizeof(errbuf));
PQfreeCancel(cancel);
}
}
PQfinish(AH->connection);
AH->connection = NULL;
}

View File

@ -35,6 +35,7 @@
#include "compress_io.h"
#include "dumputils.h"
#include "parallel.h"
#include <dirent.h>
#include <sys/stat.h>
@ -50,6 +51,7 @@ typedef struct
cfp *dataFH; /* currently open data file */
cfp *blobsTocFH; /* file handle for blobs.toc */
ParallelState *pstate; /* for parallel backup / restore */
} lclContext;
typedef struct
@ -70,6 +72,7 @@ static int _ReadByte(ArchiveHandle *);
static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
static void _CloseArchive(ArchiveHandle *AH);
static void _ReopenArchive(ArchiveHandle *AH);
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
@ -82,8 +85,17 @@ static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
const char *str, T_Action act);
static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
static void setFilePath(ArchiveHandle *AH, char *buf,
const char *relativeFilename);
/*
* Init routine required by ALL formats. This is a global routine
@ -110,7 +122,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
AH->ReopenPtr = NULL;
AH->ReopenPtr = _ReopenArchive;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
@ -121,8 +133,14 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
AH->ClonePtr = NULL;
AH->DeClonePtr = NULL;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
/* Set up our private context */
ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
@ -146,16 +164,41 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
if (mkdir(ctx->directory, 0700) < 0)
struct stat st;
bool is_empty = false;
/* we accept an empty existing directory */
if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
{
DIR *dir = opendir(ctx->directory);
if (dir)
{
struct dirent *d;
is_empty = true;
while ((d = readdir(dir)))
{
if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
{
is_empty = false;
break;
}
}
closedir(dir);
}
}
if (!is_empty && mkdir(ctx->directory, 0700) < 0)
exit_horribly(modulename, "could not create directory \"%s\": %s\n",
ctx->directory, strerror(errno));
}
else
{ /* Read Mode */
char *fname;
char fname[MAXPGPATH];
cfp *tocFH;
fname = prependDirectory(AH, "toc.dat");
setFilePath(AH, fname, "toc.dat");
tocFH = cfopen_read(fname, PG_BINARY_R);
if (tocFH == NULL)
@ -281,9 +324,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
{
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
char fname[MAXPGPATH];
fname = prependDirectory(AH, tctx->filename);
setFilePath(AH, fname, tctx->filename);
ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
if (ctx->dataFH == NULL)
@ -308,6 +351,9 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
if (dLen == 0)
return 0;
/* Are we aborting? */
checkAborting(AH);
return cfwrite(data, dLen, ctx->dataFH);
}
@ -375,8 +421,9 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
_LoadBlobs(AH, ropt);
else
{
char *fname = prependDirectory(AH, tctx->filename);
char fname[MAXPGPATH];
setFilePath(AH, fname, tctx->filename);
_PrintFileData(AH, fname, ropt);
}
}
@ -386,12 +433,12 @@ _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
char fname[MAXPGPATH];
char line[MAXPGPATH];
StartRestoreBlobs(AH);
fname = prependDirectory(AH, "blobs.toc");
setFilePath(AH, fname, "blobs.toc");
ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
@ -474,6 +521,9 @@ _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
lclContext *ctx = (lclContext *) AH->formatData;
size_t res;
/* Are we aborting? */
checkAborting(AH);
res = cfwrite(buf, len, ctx->dataFH);
if (res != len)
exit_horribly(modulename, "could not write to output file: %s\n",
@ -518,7 +568,12 @@ _CloseArchive(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
cfp *tocFH;
char *fname = prependDirectory(AH, "toc.dat");
char fname[MAXPGPATH];
setFilePath(AH, fname, "toc.dat");
/* this will actually fork the processes for a parallel backup */
ctx->pstate = ParallelBackupStart(AH, NULL);
/* The TOC is always created uncompressed */
tocFH = cfopen_write(fname, PG_BINARY_W, 0);
@ -539,11 +594,25 @@ _CloseArchive(ArchiveHandle *AH)
if (cfclose(tocFH) != 0)
exit_horribly(modulename, "could not close TOC file: %s\n",
strerror(errno));
WriteDataChunks(AH);
WriteDataChunks(AH, ctx->pstate);
ParallelBackupEnd(AH, ctx->pstate);
}
AH->FH = NULL;
}
/*
* Reopen the archive's file handle.
*/
static void
_ReopenArchive(ArchiveHandle *AH)
{
/*
* Our TOC is in memory, our data files are opened by each child anyway as
* they are separate. We support reopening the archive by just doing
* nothing.
*/
}
/*
* BLOB support
@ -560,9 +629,9 @@ static void
_StartBlobs(ArchiveHandle *AH, TocEntry *te)
{
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
char fname[MAXPGPATH];
fname = prependDirectory(AH, "blobs.toc");
setFilePath(AH, fname, "blobs.toc");
/* The blob TOC file is never compressed */
ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
@ -627,12 +696,16 @@ _EndBlobs(ArchiveHandle *AH, TocEntry *te)
ctx->blobsTocFH = NULL;
}
static char *
prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
/*
* Gets a relative file name and prepends the output directory, writing the
* result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
* big. Can't use a static char[MAXPGPATH] inside the function because we run
* multithreaded on Windows.
*/
static void
setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
{
lclContext *ctx = (lclContext *) AH->formatData;
static char buf[MAXPGPATH];
char *dname;
dname = ctx->directory;
@ -643,6 +716,157 @@ prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
strcpy(buf, dname);
strcat(buf, "/");
strcat(buf, relativeFilename);
}
/*
* Clone format-specific fields during parallel restoration.
*/
static void
_Clone(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
memcpy(AH->formatData, ctx, sizeof(lclContext));
ctx = (lclContext *) AH->formatData;
/*
* Note: we do not make a local lo_buf because we expect at most one BLOBS
* entry per archive, so no parallelism is possible. Likewise,
* TOC-entry-local state isn't an issue because any one TOC entry is
* touched by just one worker child.
*/
/*
* We also don't copy the ParallelState pointer (pstate), only the master
* process ever writes to it.
*/
}
static void
_DeClone(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
free(ctx);
}
/*
* This function is executed in the parent process. Depending on the desired
* action (dump or restore) it creates a string that is understood by the
* _WorkerJobDump /_WorkerJobRestore functions of the dump format.
*/
static char *
_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
{
/*
* A static char is okay here, even on Windows because we call this
* function only from one process (the master).
*/
static char buf[64];
if (act == ACT_DUMP)
snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
else if (act == ACT_RESTORE)
snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
return buf;
}
/*
* This function is executed in the child of a parallel backup for the
* directory archive and dumps the actual data.
*
* We are currently returning only the DumpId so theoretically we could
* make this function returning an int (or a DumpId). However, to
* facilitate further enhancements and because sooner or later we need to
* convert this to a string and send it via a message anyway, we stick with
* char *. It is parsed on the other side by the _EndMasterParallel()
* function of the respective dump format.
*/
static char *
_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
{
/*
* short fixed-size string + some ID so far, this needs to be malloc'ed
* instead of static because we work with threads on windows
*/
const int buflen = 64;
char *buf = (char *) pg_malloc(buflen);
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
/* This should never happen */
if (!tctx)
exit_horribly(modulename, "Error during backup\n");
/*
* This function returns void. We either fail and die horribly or
* succeed... A failure will be detected by the parent when the child dies
* unexpectedly.
*/
WriteDataChunksForTocEntry(AH, te);
snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
return buf;
}
/*
* This function is executed in the child of a parallel backup for the
* directory archive and dumps the actual data.
*/
static char *
_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
{
/*
* short fixed-size string + some ID so far, this needs to be malloc'ed
* instead of static because we work with threads on windows
*/
const int buflen = 64;
char *buf = (char *) pg_malloc(buflen);
ParallelArgs pargs;
int status;
pargs.AH = AH;
pargs.te = te;
status = parallel_restore(&pargs);
snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
return buf;
}
/*
* This function is executed in the parent process. It analyzes the response of
* the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
* respective dump format.
*/
static int
_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
{
DumpId dumpId;
int nBytes,
n_errors;
int status = 0;
if (act == ACT_DUMP)
{
sscanf(str, "%u%n", &dumpId, &nBytes);
Assert(dumpId == te->dumpId);
Assert(nBytes == strlen(str));
}
else if (act == ACT_RESTORE)
{
sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
Assert(dumpId == te->dumpId);
Assert(nBytes == strlen(str));
AH->public.n_errors += n_errors;
}
return status;
}

View File

@ -158,6 +158,12 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
AH->ClonePtr = NULL;
AH->DeClonePtr = NULL;
AH->MasterStartParallelItemPtr = NULL;
AH->MasterEndParallelItemPtr = NULL;
AH->WorkerJobDumpPtr = NULL;
AH->WorkerJobRestorePtr = NULL;
/*
* Set up some special context used in compressing data.
*/
@ -828,7 +834,7 @@ _CloseArchive(ArchiveHandle *AH)
/*
* Now send the data (tables & blobs)
*/
WriteDataChunks(AH);
WriteDataChunks(AH, NULL);
/*
* Now this format wants to append a script which does a full restore

File diff suppressed because it is too large Load Diff

View File

@ -252,6 +252,7 @@ typedef struct _tableInfo
/* these two are set only if table is a sequence owned by a column: */
Oid owning_tab; /* OID of table owning sequence */
int owning_col; /* attr # of column owning sequence */
int relpages;
bool interesting; /* true if need to collect more data */
@ -315,6 +316,7 @@ typedef struct _indxInfo
bool indisclustered;
/* if there is an associated constraint object, its dumpId: */
DumpId indexconstraint;
int relpages; /* relpages of the underlying table */
} IndxInfo;
typedef struct _ruleInfo
@ -532,6 +534,7 @@ extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
DumpId preBoundaryId, DumpId postBoundaryId);
extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
extern void sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs);
extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
/*
* version specific routines

View File

@ -143,6 +143,96 @@ static void repairDependencyLoop(DumpableObject **loop,
static void describeDumpableObject(DumpableObject *obj,
char *buf, int bufsize);
static int DOSizeCompare(const void *p1, const void *p2);
static int
findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
{
int i;
for (i = 0; i < numObjs; i++)
if (objs[i]->objType == type)
return i;
return -1;
}
static int
findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
{
int i;
for (i = start; i < numObjs; i++)
if (objs[i]->objType != type)
return i;
return numObjs - 1;
}
/*
* When we do a parallel dump, we want to start with the largest items first.
*
* Say we have the objects in this order:
* ....DDDDD....III....
*
* with D = Table data, I = Index, . = other object
*
* This sorting function now takes each of the D or I blocks and sorts them
* according to their size.
*/
void
sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
{
int startIdx,
endIdx;
void *startPtr;
if (numObjs <= 1)
return;
startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
if (startIdx >= 0)
{
endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
startPtr = objs + startIdx;
qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
DOSizeCompare);
}
startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
if (startIdx >= 0)
{
endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
startPtr = objs + startIdx;
qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
DOSizeCompare);
}
}
static int
DOSizeCompare(const void *p1, const void *p2)
{
DumpableObject *obj1 = *(DumpableObject **) p1;
DumpableObject *obj2 = *(DumpableObject **) p2;
int obj1_size = 0;
int obj2_size = 0;
if (obj1->objType == DO_TABLE_DATA)
obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
if (obj1->objType == DO_INDEX)
obj1_size = ((IndxInfo *) obj1)->relpages;
if (obj2->objType == DO_TABLE_DATA)
obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
if (obj2->objType == DO_INDEX)
obj2_size = ((IndxInfo *) obj2)->relpages;
/* we want to see the biggest item go first */
if (obj1_size > obj2_size)
return -1;
if (obj2_size > obj1_size)
return 1;
return 0;
}
/*
* Sort the given objects into a type/name-based ordering
@ -735,7 +825,7 @@ repairViewRuleMultiLoop(DumpableObject *viewobj,
/* remove view's dependency on rule */
removeObjectDependency(viewobj, ruleobj->dumpId);
/* pretend view is a plain table and dump it that way */
viewinfo->relkind = 'r'; /* RELKIND_RELATION */
viewinfo->relkind = 'r'; /* RELKIND_RELATION */
/* mark rule as needing its own dump */
ruleinfo->separate = true;
/* move any reloptions from view to rule */

View File

@ -657,8 +657,8 @@ dumpRoles(PGconn *conn)
"rolcreaterole, rolcreatedb, "
"rolcanlogin, rolconnlimit, rolpassword, "
"rolvaliduntil, rolreplication, "
"pg_catalog.shobj_description(oid, 'pg_authid') as rolcomment, "
"rolname = current_user AS is_current_user "
"pg_catalog.shobj_description(oid, 'pg_authid') as rolcomment, "
"rolname = current_user AS is_current_user "
"FROM pg_authid "
"ORDER BY 2");
else if (server_version >= 80200)
@ -667,8 +667,8 @@ dumpRoles(PGconn *conn)
"rolcreaterole, rolcreatedb, "
"rolcanlogin, rolconnlimit, rolpassword, "
"rolvaliduntil, false as rolreplication, "
"pg_catalog.shobj_description(oid, 'pg_authid') as rolcomment, "
"rolname = current_user AS is_current_user "
"pg_catalog.shobj_description(oid, 'pg_authid') as rolcomment, "
"rolname = current_user AS is_current_user "
"FROM pg_authid "
"ORDER BY 2");
else if (server_version >= 80100)
@ -678,7 +678,7 @@ dumpRoles(PGconn *conn)
"rolcanlogin, rolconnlimit, rolpassword, "
"rolvaliduntil, false as rolreplication, "
"null as rolcomment, "
"rolname = current_user AS is_current_user "
"rolname = current_user AS is_current_user "
"FROM pg_authid "
"ORDER BY 2");
else
@ -694,7 +694,7 @@ dumpRoles(PGconn *conn)
"valuntil as rolvaliduntil, "
"false as rolreplication, "
"null as rolcomment, "
"rolname = current_user AS is_current_user "
"rolname = current_user AS is_current_user "
"FROM pg_shadow "
"UNION ALL "
"SELECT 0, groname as rolname, "
@ -755,7 +755,7 @@ dumpRoles(PGconn *conn)
* will acquire the right properties even if it already exists (ie, it
* won't hurt for the CREATE to fail). This is particularly important
* for the role we are connected as, since even with --clean we will
* have failed to drop it. binary_upgrade cannot generate any errors,
* have failed to drop it. binary_upgrade cannot generate any errors,
* so we assume the current role is already created.
*/
if (!binary_upgrade ||
@ -1857,8 +1857,8 @@ connectDatabase(const char *dbname, const char *connection_string,
}
/*
* Ok, connected successfully. Remember the options used, in the form of
* a connection string.
* Ok, connected successfully. Remember the options used, in the form of a
* connection string.
*/
connstr = constructConnStr(keywords, values);
@ -2039,7 +2039,7 @@ static void
doConnStrQuoting(PQExpBuffer buf, const char *str)
{
const char *s;
bool needquotes;
bool needquotes;
/*
* If the string consists entirely of plain ASCII characters, no need to

View File

@ -71,6 +71,7 @@ main(int argc, char **argv)
RestoreOptions *opts;
int c;
int exit_code;
int numWorkers = 1;
Archive *AH;
char *inputFileSpec;
static int disable_triggers = 0;
@ -182,7 +183,7 @@ main(int argc, char **argv)
break;
case 'j': /* number of restore jobs */
opts->number_of_jobs = atoi(optarg);
numWorkers = atoi(optarg);
break;
case 'l': /* Dump the TOC summary */
@ -313,7 +314,7 @@ main(int argc, char **argv)
}
/* Can't do single-txn mode with multiple connections */
if (opts->single_txn && opts->number_of_jobs > 1)
if (opts->single_txn && numWorkers > 1)
{
fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"),
progname);
@ -372,6 +373,18 @@ main(int argc, char **argv)
if (opts->tocFile)
SortTocFromFile(AH, opts);
/* See comments in pg_dump.c */
#ifdef WIN32
if (numWorkers > MAXIMUM_WAIT_OBJECTS)
{
fprintf(stderr, _("%s: maximum number of parallel jobs is %d\n"),
progname, MAXIMUM_WAIT_OBJECTS);
exit(1);
}
#endif
AH->numWorkers = numWorkers;
if (opts->tocSummary)
PrintTOCSummary(AH, opts);
else

View File

@ -395,6 +395,7 @@ sub mkvcbuild
$psql->AddIncludeDir('src\bin\pg_dump');
$psql->AddIncludeDir('src\backend');
$psql->AddFile('src\bin\psql\psqlscan.l');
$psql->AddLibrary('ws2_32.lib');
my $pgdump = AddSimpleFrontend('pg_dump', 1);
$pgdump->AddIncludeDir('src\backend');
@ -403,6 +404,7 @@ sub mkvcbuild
$pgdump->AddFile('src\bin\pg_dump\pg_dump_sort.c');
$pgdump->AddFile('src\bin\pg_dump\keywords.c');
$pgdump->AddFile('src\backend\parser\kwlookup.c');
$pgdump->AddLibrary('ws2_32.lib');
my $pgdumpall = AddSimpleFrontend('pg_dump', 1);
@ -419,6 +421,7 @@ sub mkvcbuild
$pgdumpall->AddFile('src\bin\pg_dump\dumputils.c');
$pgdumpall->AddFile('src\bin\pg_dump\keywords.c');
$pgdumpall->AddFile('src\backend\parser\kwlookup.c');
$pgdumpall->AddLibrary('ws2_32.lib');
my $pgrestore = AddSimpleFrontend('pg_dump', 1);
$pgrestore->{name} = 'pg_restore';
@ -426,6 +429,7 @@ sub mkvcbuild
$pgrestore->AddFile('src\bin\pg_dump\pg_restore.c');
$pgrestore->AddFile('src\bin\pg_dump\keywords.c');
$pgrestore->AddFile('src\backend\parser\kwlookup.c');
$pgrestore->AddLibrary('ws2_32.lib');
my $zic = $solution->AddProject('zic', 'exe', 'utils');
$zic->AddFiles('src\timezone', 'zic.c', 'ialloc.c', 'scheck.c',
@ -572,6 +576,7 @@ sub mkvcbuild
$proj->AddIncludeDir('src\bin\psql');
$proj->AddReference($libpq, $libpgport, $libpgcommon);
$proj->AddResourceFile('src\bin\scripts', 'PostgreSQL Utility');
$proj->AddLibrary('ws2_32.lib');
}
# Regression DLL and EXE