diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 6389e9dc65a..54a445cfd62 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -2,20 +2,61 @@ * * parallel.c * - * Parallel support for the pg_dump archiver + * Parallel support for pg_dump and pg_restore * * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * The author is not responsible for loss or damages that may - * result from its use. - * * IDENTIFICATION * src/bin/pg_dump/parallel.c * *------------------------------------------------------------------------- */ +/* + * Parallel operation works like this: + * + * The original, master process calls ParallelBackupStart(), which forks off + * the desired number of worker processes, which each enter WaitForCommands(). + * + * The master process dispatches an individual work item to one of the worker + * processes in DispatchJobForTocEntry(). That calls + * AH->MasterStartParallelItemPtr, a routine of the output format. This + * function's arguments are the parents archive handle AH (containing the full + * catalog information), the TocEntry that the worker should work on and a + * T_Action value indicating whether this is a backup or a restore task. The + * function simply converts the TocEntry assignment into a command string that + * is then sent over to the worker process. In the simplest case that would be + * something like "DUMP 1234", with 1234 being the TocEntry id. + * + * The worker process receives and decodes the command and passes it to the + * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, + * which are routines of the current archive format. That routine performs + * the required action (dump or restore) and returns a malloc'd status string. + * The status string is passed back to the master where it is interpreted by + * AH->MasterEndParallelItemPtr, another format-specific routine. That + * function can update state or catalog information on the master's side, + * depending on the reply from the worker process. In the end it returns a + * status code, which is 0 for successful execution. + * + * Remember that we have forked off the workers only after we have read in + * the catalog. That's why our worker processes can also access the catalog + * information. (In the Windows case, the workers are threads in the same + * process. To avoid problems, they work with cloned copies of the Archive + * data structure; see init_spawned_worker_win32().) + * + * In the master process, the workerStatus field for each worker has one of + * the following values: + * WRKR_IDLE: it's waiting for a command + * WRKR_WORKING: it's been sent a command + * WRKR_FINISHED: it's returned a result + * WRKR_TERMINATED: process ended + * The FINISHED state indicates that the worker is idle, but we've not yet + * dealt with the status code it returned from the prior command. + * ReapWorkerStatus() extracts the unhandled command status value and sets + * the workerStatus back to WRKR_IDLE. + */ + #include "postgres_fe.h" #include "pg_backup_utils.h" @@ -29,15 +70,11 @@ #include #endif +/* Mnemonic macros for indexing the fd array returned by pipe(2) */ #define PIPE_READ 0 #define PIPE_WRITE 1 -/* file-scope variables */ #ifdef WIN32 -static unsigned int tMasterThreadId = 0; -static HANDLE termEvent = INVALID_HANDLE_VALUE; -static int pgpipe(int handles[2]); -static int piperead(int s, char *buf, int len); /* * Structure to hold info passed by _beginthreadex() to the function it calls @@ -47,25 +84,34 @@ typedef struct { ArchiveHandle *AH; RestoreOptions *ropt; - int worker; int pipeRead; int pipeWrite; } WorkerInfo; +/* Windows implementation of pipe access */ +static int pgpipe(int handles[2]); +static int piperead(int s, char *buf, int len); #define pipewrite(a,b,c) send(a,b,c,0) -#else + +#else /* !WIN32 */ + /* - * aborting is only ever used in the master, the workers are fine with just - * wantAbort. + * Variables for handling signals. aborting is only ever used in the master, + * the workers just need wantAbort. */ static bool aborting = false; static volatile sig_atomic_t wantAbort = 0; +/* Non-Windows implementation of pipe access */ #define pgpipe(a) pipe(a) #define piperead(a,b,c) read(a,b,c) #define pipewrite(a,b,c) write(a,b,c) -#endif +#endif /* WIN32 */ + +/* + * State info for archive_close_connection() shutdown callback. + */ typedef struct ShutdownInformation { ParallelState *pstate; @@ -74,21 +120,27 @@ typedef struct ShutdownInformation static ShutdownInformation shutdown_info; +#ifdef WIN32 +/* file-scope variables */ +static unsigned int tMasterThreadId = 0; +static HANDLE termEvent = INVALID_HANDLE_VALUE; +static DWORD tls_index; + +/* globally visible variables (needed by exit_nicely) */ +bool parallel_init_done = false; +DWORD mainThreadId; +#endif /* WIN32 */ + static const char *modulename = gettext_noop("parallel archiver"); +/* Local function prototypes */ static ParallelSlot *GetMyPSlot(ParallelState *pstate); static void archive_close_connection(int code, void *arg); static void ShutdownWorkersHard(ParallelState *pstate); static void WaitForTerminatingWorkers(ParallelState *pstate); - -#ifndef WIN32 -static void sigTermHandler(int signum); -#endif -static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, - RestoreOptions *ropt); +static void RunWorker(ArchiveHandle *AH, int pipefd[2], RestoreOptions *ropt); static bool HasEveryWorkerTerminated(ParallelState *pstate); - -static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te); +static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); static char *getMessageFromMaster(int pipefd[2]); static void sendMessageToMaster(int pipefd[2], const char *str); @@ -104,14 +156,10 @@ static char *readMessageFromPipe(int fd); #define messageEquals(msg, pattern) \ (strcmp(msg, pattern) == 0) -#ifdef WIN32 -static void shutdown_parallel_dump_utils(int code, void *unused); -bool parallel_init_done = false; -static DWORD tls_index; -DWORD mainThreadId; -#endif - +/* + * Shutdown callback to clean up socket access + */ #ifdef WIN32 static void shutdown_parallel_dump_utils(int code, void *unused) @@ -122,6 +170,11 @@ shutdown_parallel_dump_utils(int code, void *unused) } #endif +/* + * Initialize parallel dump support --- should be called early in process + * startup. (Currently, this is called whether or not we intend parallel + * activity.) + */ void init_parallel_dump_utils(void) { @@ -131,32 +184,43 @@ init_parallel_dump_utils(void) WSADATA wsaData; int err; + /* Prepare for threaded operation */ tls_index = TlsAlloc(); mainThreadId = GetCurrentThreadId(); + + /* Initialize socket access */ err = WSAStartup(MAKEWORD(2, 2), &wsaData); if (err != 0) { fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err); exit_nicely(1); } + /* ... and arrange to shut it down at exit */ on_exit_nicely(shutdown_parallel_dump_utils, NULL); parallel_init_done = true; } #endif } +/* + * Find the ParallelSlot for the current worker process or thread. + * + * Returns NULL if no matching slot is found (this implies we're the master). + */ static ParallelSlot * GetMyPSlot(ParallelState *pstate) { int i; for (i = 0; i < pstate->numWorkers; i++) + { #ifdef WIN32 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) #else if (pstate->parallelSlot[i].pid == getpid()) #endif return &(pstate->parallelSlot[i]); + } return NULL; } @@ -164,27 +228,25 @@ GetMyPSlot(ParallelState *pstate) /* * A thread-local version of getLocalPQExpBuffer(). * - * Non-reentrant but reduces memory leakage. (On Windows the memory leakage - * will be one buffer per thread, which is at least better than one per call). + * Non-reentrant but reduces memory leakage: we'll consume one buffer per + * thread, which is much better than one per fmtId/fmtQualifiedId call. */ +#ifdef WIN32 static PQExpBuffer getThreadLocalPQExpBuffer(void) { /* * The Tls code goes awry if we use a static var, so we provide for both - * static and auto, and omit any use of the static var when using Tls. + * static and auto, and omit any use of the static var when using Tls. We + * rely on TlsGetValue() to return 0 if the value is not yet set. */ static PQExpBuffer s_id_return = NULL; PQExpBuffer id_return; -#ifdef WIN32 if (parallel_init_done) - id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */ + id_return = (PQExpBuffer) TlsGetValue(tls_index); else id_return = s_id_return; -#else - id_return = s_id_return; -#endif if (id_return) /* first time through? */ { @@ -195,24 +257,19 @@ getThreadLocalPQExpBuffer(void) { /* new buffer */ id_return = createPQExpBuffer(); -#ifdef WIN32 if (parallel_init_done) TlsSetValue(tls_index, id_return); else s_id_return = id_return; -#else - s_id_return = id_return; -#endif - } return id_return; } +#endif /* WIN32 */ /* - * pg_dump and pg_restore register the Archive pointer for the exit handler - * (called from exit_nicely). This function mainly exists so that we can - * keep shutdown_info in file scope only. + * pg_dump and pg_restore call this to register the cleanup handler + * as soon as they've created the ArchiveHandle. */ void on_exit_close_archive(Archive *AHX) @@ -282,12 +339,19 @@ archive_close_connection(int code, void *arg) } /* + * Check to see if we've been told to abort, and exit the process/thread if + * so. We don't print any error message; that would just clutter the screen. + * * If we have one worker that terminates for some reason, we'd like the other * threads to terminate as well (and not finish with their 70 GB table dump - * first...). Now in UNIX we can just kill these processes, and let the signal - * handler set wantAbort to 1. In Windows we set a termEvent and this serves - * as the signal for everyone to terminate. We don't print any error message, - * that would just clutter the screen. + * first...). In Unix, the master sends SIGTERM and the worker's signal + * handler sets wantAbort to 1. In Windows we set a termEvent and this serves + * as the signal for worker threads to exit. Note that while we check this + * fairly frequently during data transfers, an idle worker doesn't come here + * at all, so additional measures are needed to force shutdown. + * + * XXX in parallel restore, slow server-side operations like CREATE INDEX + * are not interrupted by anything we do here. This needs more work. */ void checkAborting(ArchiveHandle *AH) @@ -301,7 +365,7 @@ checkAborting(ArchiveHandle *AH) } /* - * Shut down any remaining workers, waiting for them to finish. + * Forcibly shut down any remaining workers, waiting for them to finish. */ static void ShutdownWorkersHard(ParallelState *pstate) @@ -393,23 +457,24 @@ WaitForTerminatingWorkers(ParallelState *pstate) } } +/* + * Signal handler (Unix only) + */ #ifndef WIN32 -/* Signal handling (UNIX only) */ static void -sigTermHandler(int signum) +sigTermHandler(SIGNAL_ARGS) { wantAbort = 1; } #endif /* - * This function is called by both UNIX and Windows variants to set up + * This function is called by both Unix and Windows variants to set up * and run a worker process. Caller should exit the process (or thread) * upon return. */ static void -SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, - RestoreOptions *ropt) +RunWorker(ArchiveHandle *AH, int pipefd[2], RestoreOptions *ropt) { /* * Call the setup worker function that's defined in the ArchiveHandle. @@ -418,33 +483,45 @@ SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, Assert(AH->connection != NULL); + /* + * Execute commands until done. + */ WaitForCommands(AH, pipefd); } +/* + * Thread base function for Windows + */ #ifdef WIN32 static unsigned __stdcall init_spawned_worker_win32(WorkerInfo *wi) { ArchiveHandle *AH; int pipefd[2] = {wi->pipeRead, wi->pipeWrite}; - int worker = wi->worker; RestoreOptions *ropt = wi->ropt; + /* + * Clone the archive so that we have our own state to work with, and in + * particular our own database connection. + */ AH = CloneArchive(wi->AH); free(wi); - SetupWorker(AH, pipefd, worker, ropt); + /* Run the worker ... */ + RunWorker(AH, pipefd, ropt); + + /* Clean up and exit the thread */ DeCloneArchive(AH); _endthreadex(0); return 0; } -#endif +#endif /* WIN32 */ /* - * This function starts the parallel dump or restore by spawning off the - * worker processes in both Unix and Windows. For Windows, it creates a number - * of threads while it does a fork() on Unix. + * This function starts a parallel dump or restore by spawning off the worker + * processes. For Windows, it creates a number of threads; on Unix the + * workers are created with fork(). */ ParallelState * ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) @@ -474,17 +551,21 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) * set and falls back to AHX otherwise. */ shutdown_info.pstate = pstate; - getLocalPQExpBuffer = getThreadLocalPQExpBuffer; #ifdef WIN32 + /* Set up thread management state */ tMasterThreadId = GetCurrentThreadId(); termEvent = CreateEvent(NULL, true, false, "Terminate"); + /* Make fmtId() and fmtQualifiedId() use thread-local storage */ + getLocalPQExpBuffer = getThreadLocalPQExpBuffer; #else + /* Set up signal handling state */ signal(SIGTERM, sigTermHandler); signal(SIGINT, sigTermHandler); signal(SIGQUIT, sigTermHandler); #endif + /* Create desired number of workers */ for (i = 0; i < pstate->numWorkers; i++) { #ifdef WIN32 @@ -496,6 +577,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) int pipeMW[2], pipeWM[2]; + /* Create communication pipes for this worker */ if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) exit_horribly(modulename, "could not create communication channels: %s\n", @@ -514,11 +596,10 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; #ifdef WIN32 - /* Allocate a new structure for every worker */ + /* Create transient structure to pass args to worker function */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); wi->ropt = ropt; - wi->worker = i; wi->AH = AH; wi->pipeRead = pipeMW[PIPE_READ]; wi->pipeWrite = pipeWM[PIPE_WRITE]; @@ -526,7 +607,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(pstate->parallelSlot[i].threadId)); pstate->parallelSlot[i].hThread = handle; -#else +#else /* !WIN32 */ pid = fork(); if (pid == 0) { @@ -539,15 +620,6 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) pstate->parallelSlot[i].pid = getpid(); - /* - * Call CloneArchive on Unix as well even though technically we - * don't need to because fork() gives us a copy in our own address - * space already. But CloneArchive resets the state information - * and also clones the database connection (for parallel dump) - * which both seem kinda helpful. - */ - pstate->parallelSlot[i].args->AH = CloneArchive(AH); - /* close read end of Worker -> Master */ closesocket(pipeWM[PIPE_READ]); /* close write end of Master -> Worker */ @@ -563,31 +635,43 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) closesocket(pstate->parallelSlot[j].pipeWrite); } - SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt); + /* + * Call CloneArchive on Unix as well as Windows, even though + * technically we don't need to because fork() gives us a copy in + * our own address space already. But CloneArchive resets the + * state information and also clones the database connection which + * both seem kinda helpful. + */ + pstate->parallelSlot[i].args->AH = CloneArchive(AH); + /* Run the worker ... */ + RunWorker(pstate->parallelSlot[i].args->AH, pipefd, ropt); + + /* We can just exit(0) when done */ exit(0); } else if (pid < 0) + { /* fork failed */ exit_horribly(modulename, "could not create worker process: %s\n", strerror(errno)); + } - /* we are the Master, pid > 0 here */ - Assert(pid > 0); + /* In Master after successful fork */ + pstate->parallelSlot[i].pid = pid; /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); /* close write end of Worker -> Master */ closesocket(pipeWM[PIPE_WRITE]); - - pstate->parallelSlot[i].pid = pid; -#endif +#endif /* WIN32 */ } /* * Having forked off the workers, disable SIGPIPE so that master isn't - * killed if it tries to send a command to a dead worker. + * killed if it tries to send a command to a dead worker. We don't want + * the workers to inherit this setting, though. */ #ifndef WIN32 signal(SIGPIPE, SIG_IGN); @@ -597,99 +681,45 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) } /* - * Tell all of our workers to terminate. - * - * Pretty straightforward routine, first we tell everyone to terminate, then - * we listen to the workers' replies and finally close the sockets that we - * have used for communication. + * Close down a parallel dump or restore. */ void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) { int i; + /* No work if non-parallel */ if (pstate->numWorkers == 1) return; + /* There should not be any unfinished jobs */ Assert(IsEveryWorkerIdle(pstate)); - /* close the sockets so that the workers know they can exit */ + /* Close the sockets so that the workers know they can exit */ for (i = 0; i < pstate->numWorkers; i++) { closesocket(pstate->parallelSlot[i].pipeRead); closesocket(pstate->parallelSlot[i].pipeWrite); } + + /* Wait for them to exit */ WaitForTerminatingWorkers(pstate); /* - * Remove the pstate again, so the exit handler in the parent will now - * again fall back to closing AH->connection (if connected). + * Unlink pstate from shutdown_info, so the exit handler will again fall + * back to closing AH->connection (if connected). */ shutdown_info.pstate = NULL; + /* Release state (mere neatnik-ism, since we're about to terminate) */ free(pstate->parallelSlot); free(pstate); } - /* - * The sequence is the following (for dump, similar for restore): + * Dispatch a job to some free worker (caller must ensure there is one!) * - * The master process starts the parallel backup in ParllelBackupStart, this - * forks the worker processes which enter WaitForCommand(). - * - * The master process dispatches an individual work item to one of the worker - * processes in DispatchJobForTocEntry(). It calls - * AH->MasterStartParallelItemPtr, a routine of the output format. This - * function's arguments are the parents archive handle AH (containing the full - * catalog information), the TocEntry that the worker should work on and a - * T_Action act indicating whether this is a backup or a restore item. The - * function then converts the TocEntry assignment into a string that is then - * sent over to the worker process. In the simplest case that would be - * something like "DUMP 1234", with 1234 being the TocEntry id. - * - * The worker receives the message in the routine pointed to by - * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to - * corresponding routines of the respective output format, e.g. - * _WorkerJobDumpDirectory(). - * - * Remember that we have forked off the workers only after we have read in the - * catalog. That's why our worker processes can also access the catalog - * information. Now they re-translate the textual representation to a TocEntry - * on their side and do the required action (restore or dump). - * - * The result is again a textual string that is sent back to the master and is - * interpreted by AH->MasterEndParallelItemPtr. This function can update state - * or catalog information on the master's side, depending on the reply from - * the worker process. In the end it returns status which is 0 for successful - * execution. - * - * --------------------------------------------------------------------- - * Master Worker - * - * enters WaitForCommands() - * DispatchJobForTocEntry(...te...) - * - * [ Worker is IDLE ] - * - * arg = (MasterStartParallelItemPtr)() - * send: DUMP arg - * receive: DUMP arg - * str = (WorkerJobDumpPtr)(arg) - * [ Worker is WORKING ] ... gets te from arg ... - * ... dump te ... - * send: OK DUMP info - * - * In ListenToWorkers(): - * - * [ Worker is FINISHED ] - * receive: OK DUMP info - * status = (MasterEndParallelItemPtr)(info) - * - * In ReapWorkerStatus(&ptr): - * *ptr = status; - * [ Worker is IDLE ] - * --------------------------------------------------------------------- + * te is the TocEntry to be processed, act is the action to be taken on it. */ void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, @@ -699,20 +729,24 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, char *arg; /* our caller makes sure that at least one worker is idle */ - Assert(GetIdleWorker(pstate) != NO_SLOT); worker = GetIdleWorker(pstate); Assert(worker != NO_SLOT); + /* Construct and send command string */ arg = (AH->MasterStartParallelItemPtr) (AH, te, act); sendMessageToWorker(pstate, worker, arg); + /* XXX aren't we leaking string here? (no, because it's static. Ick.) */ + + /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; pstate->parallelSlot[worker].args->te = te; } /* - * Find the first free parallel slot (if any). + * Find an idle worker and return its slot number. + * Return NO_SLOT if none are idle. */ int GetIdleWorker(ParallelState *pstate) @@ -720,13 +754,15 @@ GetIdleWorker(ParallelState *pstate) int i; for (i = 0; i < pstate->numWorkers; i++) + { if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) return i; + } return NO_SLOT; } /* - * Return true iff every worker process is in the WRKR_TERMINATED state. + * Return true iff every worker is in the WRKR_TERMINATED state. */ static bool HasEveryWorkerTerminated(ParallelState *pstate) @@ -734,8 +770,10 @@ HasEveryWorkerTerminated(ParallelState *pstate) int i; for (i = 0; i < pstate->numWorkers; i++) + { if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) return false; + } return true; } @@ -748,39 +786,51 @@ IsEveryWorkerIdle(ParallelState *pstate) int i; for (i = 0; i < pstate->numWorkers; i++) + { if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) return false; + } return true; } /* - * --------------------------------------------------------------------- - * One danger of the parallel backup is a possible deadlock: + * Acquire lock on a table to be dumped by a worker process. + * + * The master process is already holding an ACCESS SHARE lock. Ordinarily + * it's no problem for a worker to get one too, but if anything else besides + * pg_dump is running, there's a possible deadlock: * * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode. * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted * because the master holds a conflicting ACCESS SHARE lock). - * 3) The worker process also requests an ACCESS SHARE lock to read the table. - * The worker's not granted that lock but is enqueued behind the ACCESS - * EXCLUSIVE lock request. - * --------------------------------------------------------------------- + * 3) A worker process also requests an ACCESS SHARE lock to read the table. + * The worker is enqueued behind the ACCESS EXCLUSIVE lock request. + * 4) Now we have a deadlock, since the master is effectively waiting for + * the worker. The server cannot detect that, however. * - * Now what we do here is to just request a lock in ACCESS SHARE but with - * NOWAIT in the worker prior to touching the table. If we don't get the lock, + * To prevent an infinite wait, prior to touching a table in a worker, request + * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock, * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and - * are good to just fail the whole backup because we have detected a deadlock. + * so we have a deadlock. We must fail the backup in that case. */ static void -lockTableNoWait(ArchiveHandle *AH, TocEntry *te) +lockTableForWorker(ArchiveHandle *AH, TocEntry *te) { Archive *AHX = (Archive *) AH; const char *qualId; - PQExpBuffer query = createPQExpBuffer(); + PQExpBuffer query; PGresult *res; - Assert(AH->format == archDirectory); - Assert(strcmp(te->desc, "BLOBS") != 0); + /* Nothing to do for BLOBS */ + if (strcmp(te->desc, "BLOBS") == 0) + return; + query = createPQExpBuffer(); + + /* + * XXX this is an unbelievably expensive substitute for knowing how to dig + * a table name out of a TocEntry. + */ appendPQExpBuffer(query, "SELECT pg_namespace.nspname," " pg_class.relname " @@ -810,8 +860,8 @@ lockTableNoWait(ArchiveHandle *AH, TocEntry *te) if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) exit_horribly(modulename, "could not obtain lock on relation \"%s\"\n" - "This usually means that someone requested an ACCESS EXCLUSIVE lock " - "on the table after the pg_dump parent process had gotten the " + "This usually means that someone requested an ACCESS EXCLUSIVE lock " + "on the table after the pg_dump parent process had gotten the " "initial ACCESS SHARE lock on the table.\n", qualId); PQclear(res); @@ -819,11 +869,9 @@ lockTableNoWait(ArchiveHandle *AH, TocEntry *te) } /* - * That's the main routine for the worker. - * When it starts up it enters this routine and waits for commands from the - * master process. After having processed a command it comes back to here to - * wait for the next command. Finally it will receive a TERMINATE command and - * exit. + * WaitForCommands: main routine for a worker process. + * + * Read and execute commands from the master until we see EOF on the pipe. */ static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]) @@ -831,13 +879,14 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) char *command; DumpId dumpId; int nBytes; - char *str = NULL; + char *str; TocEntry *te; for (;;) { if (!(command = getMessageFromMaster(pipefd))) { + /* EOF ... clean up */ PQfinish(AH->connection); AH->connection = NULL; return; @@ -845,55 +894,44 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) if (messageStartsWith(command, "DUMP ")) { - Assert(AH->format == archDirectory); + /* Decode the command */ sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes); Assert(nBytes == strlen(command) - strlen("DUMP ")); - te = getTocEntryByDumpId(AH, dumpId); Assert(te != NULL); - /* - * Lock the table but with NOWAIT. Note that the parent is already - * holding a lock. If we cannot acquire another ACCESS SHARE MODE - * lock, then somebody else has requested an exclusive lock in the - * meantime. lockTableNoWait dies in this case to prevent a - * deadlock. - */ - if (strcmp(te->desc, "BLOBS") != 0) - lockTableNoWait(AH, te); + /* Acquire lock on this table within the worker's session */ + lockTableForWorker(AH, te); - /* - * The message we return here has been pg_malloc()ed and we are - * responsible for free()ing it. - */ + /* Perform the dump command */ str = (AH->WorkerJobDumpPtr) (AH, te); - Assert(AH->connection != NULL); + + /* Return status to master */ sendMessageToMaster(pipefd, str); + + /* we are responsible for freeing the status string */ free(str); } else if (messageStartsWith(command, "RESTORE ")) { - Assert(AH->format == archDirectory || AH->format == archCustom); - Assert(AH->connection != NULL); - + /* Decode the command */ sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes); Assert(nBytes == strlen(command) - strlen("RESTORE ")); - te = getTocEntryByDumpId(AH, dumpId); Assert(te != NULL); - /* - * The message we return here has been pg_malloc()ed and we are - * responsible for free()ing it. - */ + /* Perform the restore command */ str = (AH->WorkerJobRestorePtr) (AH, te); - Assert(AH->connection != NULL); + + /* Return status to master */ sendMessageToMaster(pipefd, str); + + /* we are responsible for freeing the status string */ free(str); } else exit_horribly(modulename, - "unrecognized command on communication channel: %s\n", + "unrecognized command received from master: \"%s\"\n", command); /* command was pg_malloc'd and we are responsible for free()ing it. */ @@ -902,18 +940,21 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) } /* - * --------------------------------------------------------------------- - * Note the status change: + * Check for status messages from workers. * - * DispatchJobForTocEntry WRKR_IDLE -> WRKR_WORKING - * ListenToWorkers WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED - * ReapWorkerStatus WRKR_FINISHED -> WRKR_IDLE - * --------------------------------------------------------------------- + * If do_wait is true, wait to get a status message; otherwise, just return + * immediately if there is none available. * - * Just calling ReapWorkerStatus() when all workers are working might or might - * not give you an idle worker because you need to call ListenToWorkers() in - * between and only thereafter ReapWorkerStatus(). This is necessary in order - * to get and deal with the status (=result) of the worker's execution. + * When we get a status message, we let MasterEndParallelItemPtr process it, + * then save the resulting status code and switch the worker's state to + * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify + * that the status was "OK" and push the worker back to IDLE state. + * + * XXX Rube Goldberg would be proud of this API, but no one else should be. + * + * XXX is it worth checking for more than one status message per call? + * It seems somewhat unlikely that multiple workers would finish at exactly + * the same time. */ void ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) @@ -921,22 +962,23 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) int worker; char *msg; + /* Try to collect a status message */ msg = getMessageFromWorker(pstate, do_wait, &worker); if (!msg) { + /* If do_wait is true, we must have detected EOF on some socket */ if (do_wait) exit_horribly(modulename, "a worker process died unexpectedly\n"); return; } + /* Process it and update our idea of the worker's status */ if (messageStartsWith(msg, "OK ")) { + TocEntry *te = pstate->parallelSlot[worker].args->te; char *statusString; - TocEntry *te; - pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; - te = pstate->parallelSlot[worker].args->te; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); @@ -955,22 +997,23 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) exit_horribly(modulename, "invalid message received from worker: \"%s\"\n", msg); + pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; } else exit_horribly(modulename, "invalid message received from worker: \"%s\"\n", msg); - /* both Unix and Win32 return pg_malloc()ed space, so we free it */ + /* Free the string returned from getMessageFromWorker */ free(msg); } /* - * This function is executed in the master process. + * Check to see if any worker is in WRKR_FINISHED state. If so, + * return its command status code into *status, reset it to IDLE state, + * and return its slot number. Otherwise return NO_SLOT. * - * This function is used to get the return value of a terminated worker - * process. If a process has terminated, its status is stored in *status and - * the id of the worker is returned. + * This function is executed in the master process. */ int ReapWorkerStatus(ParallelState *pstate, int *status) @@ -991,9 +1034,16 @@ ReapWorkerStatus(ParallelState *pstate, int *status) } /* - * This function is executed in the master process. + * Wait, if necessary, until we have at least one idle worker. + * Reap worker status as necessary to move FINISHED workers to IDLE state. * - * It looks for an idle worker process and only returns if there is one. + * We assume that no extra processing is required when reaping a finished + * command, except for checking that the status was OK (zero). + * Caution: that assumption means that this function can only be used in + * parallel dump, not parallel restore, because the latter has a more + * complex set of rules about handling status. + * + * This function is executed in the master process. */ void EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate) @@ -1033,9 +1083,16 @@ EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate) } /* - * This function is executed in the master process. + * Wait for all workers to be idle. + * Reap worker status as necessary to move FINISHED workers to IDLE state. * - * It waits for all workers to terminate. + * We assume that no extra processing is required when reaping a finished + * command, except for checking that the status was OK (zero). + * Caution: that assumption means that this function can only be used in + * parallel dump, not parallel restore, because the latter has a more + * complex set of rules about handling status. + * + * This function is executed in the master process. */ void EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) @@ -1057,10 +1114,11 @@ EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) } /* - * This function is executed in the worker process. + * Read one command message from the master, blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. * - * It returns the next message on the communication channel, blocking until it - * becomes available. + * This function is executed in worker processes. */ static char * getMessageFromMaster(int pipefd[2]) @@ -1069,9 +1127,9 @@ getMessageFromMaster(int pipefd[2]) } /* - * This function is executed in the worker process. + * Send a status message to the master. * - * It sends a message to the master on the communication channel. + * This function is executed in worker processes. */ static void sendMessageToMaster(int pipefd[2], const char *str) @@ -1085,9 +1143,8 @@ sendMessageToMaster(int pipefd[2], const char *str) } /* - * A select loop that repeats calling select until a descriptor in the read - * set becomes readable. On Windows we have to check for the termination event - * from time to time, on Unix we can just block forever. + * Wait until some descriptor in "workerset" becomes readable. + * Returns -1 on error, else the number of readable descriptors. */ static int select_loop(int maxFd, fd_set *workerset) @@ -1096,13 +1153,13 @@ select_loop(int maxFd, fd_set *workerset) fd_set saveSet = *workerset; #ifdef WIN32 - /* should always be the master */ - Assert(tMasterThreadId == GetCurrentThreadId()); - for (;;) { /* - * sleep a quarter of a second before checking if we should terminate. + * Sleep a quarter of a second before checking if we should terminate. + * + * XXX we're not actually checking for a cancel interrupt ... but we + * should be. */ struct timeval tv = {0, 250000}; @@ -1114,15 +1171,14 @@ select_loop(int maxFd, fd_set *workerset) if (i) break; } -#else /* UNIX */ - +#else /* !WIN32 */ for (;;) { *workerset = saveSet; i = select(maxFd + 1, workerset, NULL, NULL, NULL); /* - * If we Ctrl-C the master process , it's likely that we interrupt + * If we Ctrl-C the master process, it's likely that we interrupt * select() here. The signal handler will set wantAbort == true and * the shutdown journey starts from here. Note that we'll come back * here later when we tell all workers to terminate and read their @@ -1135,19 +1191,25 @@ select_loop(int maxFd, fd_set *workerset) continue; break; } -#endif +#endif /* WIN32 */ return i; } /* + * Check for messages from worker processes. + * + * If a message is available, return it as a malloc'd string, and put the + * index of the sending worker in *worker. + * + * If nothing is available, wait if "do_wait" is true, else return NULL. + * + * If we detect EOF on any socket, we'll return NULL. It's not great that + * that's hard to distinguish from the no-data-available case, but for now + * our one caller is okay with that. + * * This function is executed in the master process. - * - * It returns the next message from the worker on the communication channel, - * optionally blocking (do_wait) until it becomes available. - * - * The id of the worker is returned in *worker. */ static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) @@ -1157,14 +1219,13 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) int maxFd = -1; struct timeval nowait = {0, 0}; + /* construct bitmap of socket descriptors for select() */ FD_ZERO(&workerset); - for (i = 0; i < pstate->numWorkers; i++) { if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) continue; FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); - /* actually WIN32 ignores the first parameter to select()... */ if (pstate->parallelSlot[i].pipeRead > maxFd) maxFd = pstate->parallelSlot[i].pipeRead; } @@ -1181,7 +1242,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) } if (i < 0) - exit_horribly(modulename, "error in ListenToWorkers(): %s\n", strerror(errno)); + exit_horribly(modulename, "select() failed: %s\n", strerror(errno)); for (i = 0; i < pstate->numWorkers; i++) { @@ -1190,6 +1251,16 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) continue; + /* + * Read the message if any. If the socket is ready because of EOF, + * we'll return NULL instead (and the socket will stay ready, so the + * condition will persist). + * + * Note: because this is a blocking read, we'll wait if only part of + * the message is available. Waiting a long time would be bad, but + * since worker status messages are short and are always sent in one + * operation, it shouldn't be a problem in practice. + */ msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); *worker = i; return msg; @@ -1199,9 +1270,9 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) } /* - * This function is executed in the master process. + * Send a command message to the specified worker process. * - * It sends a message to a certain worker on the communication channel. + * This function is executed in the master process. */ static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str) @@ -1212,20 +1283,24 @@ sendMessageToWorker(ParallelState *pstate, int worker, const char *str) { /* * If we're already aborting anyway, don't care if we succeed or not. - * The child might have gone already. + * The child might have gone already. (XXX but if we're aborting + * already, why are we here at all?) */ #ifndef WIN32 if (!aborting) #endif exit_horribly(modulename, - "could not write to the communication channel: %s\n", + "could not write to the communication channel: %s\n", strerror(errno)); } } /* - * The underlying function to read a message from the communication channel - * (fd) with optional blocking (do_wait). + * Read one message from the specified pipe (fd), blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. + * + * A "message" on the channel is just a null-terminated string. */ static char * readMessageFromPipe(int fd) @@ -1236,58 +1311,53 @@ readMessageFromPipe(int fd) int ret; /* - * The problem here is that we need to deal with several possibilites: we - * could receive only a partial message or several messages at once. The - * caller expects us to return exactly one message however. - * - * We could either read in as much as we can and keep track of what we - * delivered back to the caller or we just read byte by byte. Once we see - * (char) 0, we know that it's the message's end. This would be quite - * inefficient for more data but since we are reading only on the command - * channel, the performance loss does not seem worth the trouble of - * keeping internal states for different file descriptors. + * In theory, if we let piperead() read multiple bytes, it might give us + * back fragments of multiple messages. (That can't actually occur, since + * neither master nor workers send more than one message without waiting + * for a reply, but we don't wish to assume that here.) For simplicity, + * read a byte at a time until we get the terminating '\0'. This method + * is a bit inefficient, but since this is only used for relatively short + * command and status strings, it shouldn't matter. */ bufsize = 64; /* could be any number */ msg = (char *) pg_malloc(bufsize); - msgsize = 0; for (;;) { - Assert(msgsize <= bufsize); + Assert(msgsize < bufsize); ret = piperead(fd, msg + msgsize, 1); - - /* worker has closed the connection or another error happened */ if (ret <= 0) - break; + break; /* error or connection closure */ Assert(ret == 1); if (msg[msgsize] == '\0') - return msg; + return msg; /* collected whole message */ msgsize++; - if (msgsize == bufsize) + if (msgsize == bufsize) /* enlarge buffer if needed */ { - /* could be any number */ - bufsize += 16; + bufsize += 16; /* could be any number */ msg = (char *) pg_realloc(msg, bufsize); } } - /* - * Worker has closed the connection, make sure to clean up before return - * since we are not returning msg (but did allocate it). - */ + /* Other end has closed the connection */ pg_free(msg); - return NULL; } #ifdef WIN32 + /* - * This is a replacement version of pipe for Win32 which allows returned - * handles to be used in select(). Note that read/write calls must be replaced - * with recv/send. + * This is a replacement version of pipe(2) for Windows which allows the pipe + * handles to be used in select(). + * + * Reads and writes on the pipe must go through piperead()/pipewrite(). + * + * For consistency with Unix we declare the returned handles as "int". + * This is okay even on WIN64 because system handles are not more than + * 32 bits wide, but we do have to do some casting. */ static int pgpipe(int handles[2]) @@ -1342,6 +1412,8 @@ pgpipe(int handles[2]) { write_msg(modulename, "pgpipe: could not connect socket: error code %d\n", WSAGetLastError()); + closesocket(handles[1]); + handles[1] = INVALID_SOCKET; closesocket(s); return -1; } @@ -1358,15 +1430,20 @@ pgpipe(int handles[2]) return 0; } +/* + * Windows implementation of reading from a pipe. + */ static int piperead(int s, char *buf, int len) { int ret = recv(s, buf, len, 0); if (ret < 0 && WSAGetLastError() == WSAECONNRESET) - /* EOF on the pipe! (win32 socket based implementation) */ + { + /* EOF on the pipe! */ ret = 0; + } return ret; } -#endif +#endif /* WIN32 */ diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index b5124a2d45a..b46abb2cb60 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2106,6 +2106,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, return AH; } +/* + * Write out all data (tables & blobs) + */ void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { @@ -2123,15 +2126,18 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { /* * If we are in a parallel backup, then we are always the master - * process. + * process. Dispatch each data-transfer job to a worker. */ EnsureIdleWorker(AH, pstate); - Assert(GetIdleWorker(pstate) != NO_SLOT); DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP); } else WriteDataChunksForTocEntry(AH, te); } + + /* + * If parallel, wait for workers to finish. + */ EnsureWorkersFinished(AH, pstate); } @@ -3573,13 +3579,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, par_list_remove(next_work_item); - Assert(GetIdleWorker(pstate) != NO_SLOT); DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE); } else { /* at least one child is working and we have nothing ready. */ - Assert(!IsEveryWorkerIdle(pstate)); } for (;;)