diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 93a566ba629..438d1b24fc2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -330,8 +330,8 @@ gather_readnext(GatherState *gatherstate) HeapTuple tup; bool readerdone; - /* Make sure we've read all messages from workers. */ - HandleParallelMessages(); + /* Check for async events, particularly messages from workers. */ + CHECK_FOR_INTERRUPTS(); /* Attempt to read a tuple, but don't block if none is available. */ reader = gatherstate->reader[gatherstate->nextreader]; @@ -388,7 +388,6 @@ gather_readnext(GatherState *gatherstate) /* Nothing to do except wait for developments. */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - CHECK_FOR_INTERRUPTS(); ResetLatch(MyLatch); nvisited = 0; } diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 0dcdee03db5..921242fbc4e 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -172,8 +172,8 @@ mq_putmessage(char msgtype, const char *s, size_t len) break; WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); - CHECK_FOR_INTERRUPTS(); ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); } pq_mq_busy = false; diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 44ede336162..5b32782022b 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -896,11 +896,11 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - /* An interrupt may have occurred while we were waiting. */ - CHECK_FOR_INTERRUPTS(); - /* Reset the latch so we don't spin. */ ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); } else { @@ -993,11 +993,11 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - /* An interrupt may have occurred while we were waiting. */ - CHECK_FOR_INTERRUPTS(); - /* Reset the latch so we don't spin. */ ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); } } @@ -1092,11 +1092,11 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr, /* Wait to be signalled. */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - /* An interrupt may have occurred while we were waiting. */ - CHECK_FOR_INTERRUPTS(); - /* Reset the latch so we don't spin. */ ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); } return result; diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 85d211c0e1c..5179ecc0dbd 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -52,6 +52,22 @@ * do. Otherwise, if someone sets the latch between the check and the * ResetLatch call, you will miss it and Wait will incorrectly block. * + * Another valid coding pattern looks like: + * + * for (;;) + * { + * if (work to do) + * Do Stuff(); // in particular, exit loop if some condition satisfied + * WaitLatch(); + * ResetLatch(); + * } + * + * This is useful to reduce latch traffic if it's expected that the loop's + * termination condition will often be satisfied in the first iteration; + * the cost is an extra loop iteration before blocking when it is not. + * What must be avoided is placing any checks for asynchronous events after + * WaitLatch and before ResetLatch, as that creates a race condition. + * * To wake up the waiter, you must first set a global flag or something * else that the wait loop tests in the "if (work to do)" part, and call * SetLatch *after* that. SetLatch is designed to return quickly if the diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c index 5bd282078cc..143df4eb651 100644 --- a/src/test/modules/test_shm_mq/setup.c +++ b/src/test/modules/test_shm_mq/setup.c @@ -281,11 +281,11 @@ wait_for_workers_to_become_ready(worker_state *wstate, /* Wait to be signalled. */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - /* An interrupt may have occurred while we were waiting. */ - CHECK_FOR_INTERRUPTS(); - /* Reset the latch so we don't spin. */ ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); } if (!result) diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c index 6948e208996..dd34bc7e7f0 100644 --- a/src/test/modules/test_shm_mq/test.c +++ b/src/test/modules/test_shm_mq/test.c @@ -231,8 +231,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS) * for us to do. */ WaitLatch(MyLatch, WL_LATCH_SET, 0); - CHECK_FOR_INTERRUPTS(); ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); } }