1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Fix parallel pg_dump/pg_restore for failure to create worker processes.

If we failed to fork a worker process, or create a communication pipe
for one, WaitForTerminatingWorkers would suffer an assertion failure
if assert-enabled, otherwise crash or go into an infinite loop.  This
was a consequence of not accounting for the startup condition where
we've not yet forked all the workers.

The original bug was that ParallelBackupStart would set workerStatus to
WRKR_IDLE before it had successfully forked a worker.  I made things
worse in commit b7b8cc0cf by not understanding the undocumented fact
that the WRKR_TERMINATED state was also meant to represent the case
where a worker hadn't been started yet: I changed enum T_WorkerStatus
so that *all* the worker slots were initially in WRKR_IDLE state.  But
this wasn't any more broken in practice, since even one slot in the
wrong state would keep WaitForTerminatingWorkers from terminating.

In v10 and later, introduce an explicit T_WorkerStatus value for
worker-not-started, in hopes of preventing future oversights of the
same ilk.  Before that, just document that WRKR_TERMINATED is supposed
to cover that case (partly because it wasn't actively broken, and
partly because the enum is exposed outside parallel.c in those branches,
so there's microscopically more risk involved in changing it).
In all branches, introduce a WORKER_IS_RUNNING status test macro
to hide which T_WorkerStatus values mean that, and be more careful
not to access ParallelSlot fields till we're sure they're valid.

Per report from Vignesh C, though this is my patch not his.
Back-patch to all supported branches.

Discussion: https://postgr.es/m/CALDaNm1Luv-E3sarR+-unz-BjchquHHyfP+YC+2FS2pt_J+wxg@mail.gmail.com
This commit is contained in:
Tom Lane
2020-01-31 14:41:49 -05:00
parent 8b29c75f62
commit 63634883fb

View File

@@ -42,6 +42,7 @@
* *
* In the master process, the workerStatus field for each worker has one of * In the master process, the workerStatus field for each worker has one of
* the following values: * the following values:
* WRKR_NOT_STARTED: we've not yet forked this worker
* WRKR_IDLE: it's waiting for a command * WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's working on a command * WRKR_WORKING: it's working on a command
* WRKR_TERMINATED: process ended * WRKR_TERMINATED: process ended
@@ -76,11 +77,15 @@
/* Worker process statuses */ /* Worker process statuses */
typedef enum typedef enum
{ {
WRKR_NOT_STARTED = 0,
WRKR_IDLE, WRKR_IDLE,
WRKR_WORKING, WRKR_WORKING,
WRKR_TERMINATED WRKR_TERMINATED
} T_WorkerStatus; } T_WorkerStatus;
#define WORKER_IS_RUNNING(workerStatus) \
((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
/* /*
* Private per-parallel-worker state (typedef for this is in parallel.h). * Private per-parallel-worker state (typedef for this is in parallel.h).
* *
@@ -417,7 +422,9 @@ ShutdownWorkersHard(ParallelState *pstate)
/* /*
* Close our write end of the sockets so that any workers waiting for * Close our write end of the sockets so that any workers waiting for
* commands know they can exit. * commands know they can exit. (Note: some of the pipeWrite fields might
* still be zero, if we failed to initialize all the workers. Hence, just
* ignore errors here.)
*/ */
for (i = 0; i < pstate->numWorkers; i++) for (i = 0; i < pstate->numWorkers; i++)
closesocket(pstate->parallelSlot[i].pipeWrite); closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -491,7 +498,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
for (j = 0; j < pstate->numWorkers; j++) for (j = 0; j < pstate->numWorkers; j++)
{ {
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
{ {
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
nrun++; nrun++;
@@ -927,6 +934,7 @@ ParallelBackupStart(ArchiveHandle *AH)
if (AH->public.numWorkers == 1) if (AH->public.numWorkers == 1)
return pstate; return pstate;
/* Create status arrays, being sure to initialize all fields to 0 */
pstate->te = (TocEntry **) pstate->te = (TocEntry **)
pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
pstate->parallelSlot = (ParallelSlot *) pstate->parallelSlot = (ParallelSlot *)
@@ -976,13 +984,6 @@ ParallelBackupStart(ArchiveHandle *AH)
"could not create communication channels: %s\n", "could not create communication channels: %s\n",
strerror(errno)); strerror(errno));
pstate->te[i] = NULL; /* just for safety */
slot->workerStatus = WRKR_IDLE;
slot->AH = NULL;
slot->callback = NULL;
slot->callback_data = NULL;
/* master's ends of the pipes */ /* master's ends of the pipes */
slot->pipeRead = pipeWM[PIPE_READ]; slot->pipeRead = pipeWM[PIPE_READ];
slot->pipeWrite = pipeMW[PIPE_WRITE]; slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -1000,6 +1001,7 @@ ParallelBackupStart(ArchiveHandle *AH)
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(slot->threadId)); wi, 0, &(slot->threadId));
slot->hThread = handle; slot->hThread = handle;
slot->workerStatus = WRKR_IDLE;
#else /* !WIN32 */ #else /* !WIN32 */
pid = fork(); pid = fork();
if (pid == 0) if (pid == 0)
@@ -1044,6 +1046,7 @@ ParallelBackupStart(ArchiveHandle *AH)
/* In Master after successful fork */ /* In Master after successful fork */
slot->pid = pid; slot->pid = pid;
slot->workerStatus = WRKR_IDLE;
/* close read end of Master -> Worker */ /* close read end of Master -> Worker */
closesocket(pipeMW[PIPE_READ]); closesocket(pipeMW[PIPE_READ]);
@@ -1273,7 +1276,7 @@ GetIdleWorker(ParallelState *pstate)
} }
/* /*
* Return true iff every worker is in the WRKR_TERMINATED state. * Return true iff no worker is running.
*/ */
static bool static bool
HasEveryWorkerTerminated(ParallelState *pstate) HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1282,7 +1285,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
for (i = 0; i < pstate->numWorkers; i++) for (i = 0; i < pstate->numWorkers; i++)
{ {
if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
return false; return false;
} }
return true; return true;
@@ -1618,7 +1621,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
FD_ZERO(&workerset); FD_ZERO(&workerset);
for (i = 0; i < pstate->numWorkers; i++) for (i = 0; i < pstate->numWorkers; i++)
{ {
if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
continue; continue;
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
if (pstate->parallelSlot[i].pipeRead > maxFd) if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1643,6 +1646,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
{ {
char *msg; char *msg;
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
continue;
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
continue; continue;