diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ab5ef2573cf..9325b628da3 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -191,8 +191,8 @@ CreateParallelContextForExternalFunction(char *library_name, /* * Establish the dynamic shared memory segment for a parallel context and - * copied state and other bookkeeping information that will need by parallel - * workers into it. + * copy state and other bookkeeping information that will be needed by + * parallel workers into it. */ void InitializeParallelDSM(ParallelContext *pcxt) @@ -271,7 +271,7 @@ InitializeParallelDSM(ParallelContext *pcxt) * parallelism than to fail outright. */ segsize = shm_toc_estimate(&pcxt->estimator); - if (pcxt->nworkers != 0) + if (pcxt->nworkers > 0) pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (pcxt->seg != NULL) pcxt->toc = shm_toc_create(PARALLEL_MAGIC, @@ -397,11 +397,13 @@ ReinitializeParallelDSM(ParallelContext *pcxt) char *error_queue_space; int i; - if (pcxt->nworkers_launched == 0) - return; - - WaitForParallelWorkersToFinish(pcxt); - WaitForParallelWorkersToExit(pcxt); + /* Wait for any old workers to exit. */ + if (pcxt->nworkers_launched > 0) + { + WaitForParallelWorkersToFinish(pcxt); + WaitForParallelWorkersToExit(pcxt); + pcxt->nworkers_launched = 0; + } /* Reset a few bits of fixed parallel state to a clean state. */ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); @@ -420,9 +422,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt) shm_mq_set_receiver(mq, MyProc); pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } - - /* Reset number of workers launched. */ - pcxt->nworkers_launched = 0; } /* @@ -493,6 +492,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) */ any_registrations_failed = true; pcxt->worker[i].bgwhandle = NULL; + pfree(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } }