diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index e7be7ec1179..104401d0feb 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -283,27 +283,31 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, do { /* - * Clear the pipe, then check if the latch is set already. If someone - * sets the latch between this and the poll()/select() below, the - * setter will write a byte to the pipe (or signal us and the signal - * handler will do that), and the poll()/select() will return - * immediately. + * Check if the latch is set already. If so, leave loop immediately, + * avoid blocking again. We don't attempt to report any other events + * that might also be satisfied. + * + * If someone sets the latch between this and the poll()/select() + * below, the setter will write a byte to the pipe (or signal us and + * the signal handler will do that), and the poll()/select() will + * return immediately. + * + * If there's a pending byte in the self pipe, we'll notice whenever + * blocking. Only clearing the pipe in that case avoids having to + * drain it every time WaitLatchOrSocket() is used. Should the + * pipe-buffer fill up we're still ok, because the pipe is in + * nonblocking mode. It's unlikely for that to happen, because the + * self pipe isn't filled unless we're blocking (waiting = true), or + * from inside a signal handler in latch_sigusr1_handler(). * * Note: we assume that the kernel calls involved in drainSelfPipe() * and SetLatch() will provide adequate synchronization on machines * with weak memory ordering, so that we cannot miss seeing is_set if * the signal byte is already in the pipe when we drain it. */ - drainSelfPipe(); - if ((wakeEvents & WL_LATCH_SET) && latch->is_set) { result |= WL_LATCH_SET; - - /* - * Leave loop immediately, avoid blocking again. We don't attempt - * to report any other events that might also be satisfied. - */ break; } @@ -313,24 +317,26 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, */ #if defined(LATCH_USE_POLL) nfds = 0; + + /* selfpipe is always in pfds[0] */ + pfds[0].fd = selfpipe_readfd; + pfds[0].events = POLLIN; + pfds[0].revents = 0; + nfds++; + if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { - /* socket, if used, is always in pfds[0] */ - pfds[0].fd = sock; - pfds[0].events = 0; + /* socket, if used, is always in pfds[1] */ + pfds[1].fd = sock; + pfds[1].events = 0; if (wakeEvents & WL_SOCKET_READABLE) - pfds[0].events |= POLLIN; + pfds[1].events |= POLLIN; if (wakeEvents & WL_SOCKET_WRITEABLE) - pfds[0].events |= POLLOUT; - pfds[0].revents = 0; + pfds[1].events |= POLLOUT; + pfds[1].revents = 0; nfds++; } - pfds[nfds].fd = selfpipe_readfd; - pfds[nfds].events = POLLIN; - pfds[nfds].revents = 0; - nfds++; - if (wakeEvents & WL_POSTMASTER_DEATH) { /* postmaster fd, if used, is always in pfds[nfds - 1] */ @@ -364,19 +370,27 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, else { /* at least one event occurred, so check revents values */ + + if (pfds[0].revents & POLLIN) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + } + if ((wakeEvents & WL_SOCKET_READABLE) && - (pfds[0].revents & POLLIN)) + (pfds[1].revents & POLLIN)) { /* data available in socket, or EOF/error condition */ result |= WL_SOCKET_READABLE; } if ((wakeEvents & WL_SOCKET_WRITEABLE) && - (pfds[0].revents & POLLOUT)) + (pfds[1].revents & POLLOUT)) { /* socket is writable */ result |= WL_SOCKET_WRITEABLE; } - if (pfds[0].revents & (POLLHUP | POLLERR | POLLNVAL)) + if ((wakeEvents & WL_SOCKET_WRITEABLE) && + (pfds[1].revents & (POLLHUP | POLLERR | POLLNVAL))) { /* EOF/error condition */ if (wakeEvents & WL_SOCKET_READABLE) @@ -468,6 +482,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, else { /* at least one event occurred, so check masks */ + if (FD_ISSET(selfpipe_readfd, &input_mask)) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + } if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask)) { /* data available in socket, or EOF */ @@ -498,6 +517,16 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, } #endif /* LATCH_USE_SELECT */ + /* + * Check again whether latch is set, the arrival of a signal/self-byte + * might be what stopped our sleep. It's not required for correctness + * to signal the latch as being set (we'd just loop if there's no + * other event), but it seems good to report an arrived latch asap. + * This way we also don't have to compute the current timestamp again. + */ + if ((wakeEvents & WL_LATCH_SET) && latch->is_set) + result |= WL_LATCH_SET; + /* If we're not done, update cur_timeout for next iteration */ if (result == 0 && (wakeEvents & WL_TIMEOUT)) { diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index b1b071339ee..bbf1b24bdf3 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -181,14 +181,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, do { /* - * Reset the event, and check if the latch is set already. If someone - * sets the latch between this and the WaitForMultipleObjects() call - * below, the setter will set the event and WaitForMultipleObjects() - * will return immediately. + * The comment in unix_latch.c's equivalent to this applies here as + * well. At least after mentally replacing self-pipe with windows + * event. There's no danger of overflowing, as "Setting an event that + * is already set has no effect.". */ - if (!ResetEvent(latchevent)) - elog(ERROR, "ResetEvent failed: error code %lu", GetLastError()); - if ((wakeEvents & WL_LATCH_SET) && latch->is_set) { result |= WL_LATCH_SET; @@ -217,9 +214,13 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, else if (rc == WAIT_OBJECT_0 + 1) { /* - * Latch is set. We'll handle that on next iteration of loop, but - * let's not waste the cycles to update cur_timeout below. + * Reset the event. We'll re-check the, potentially, set latch on + * next iteration of loop, but let's not waste the cycles to + * update cur_timeout below. */ + if (!ResetEvent(latchevent)) + elog(ERROR, "ResetEvent failed: error code %lu", GetLastError()); + continue; } else if ((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) &&