From 50e570a59e7f86bb41f029a66b781fc79b8d50f1 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 14 Feb 2022 16:29:28 +1300 Subject: [PATCH] Add WL_SOCKET_CLOSED for socket shutdown events. Provide a way for WaitEventSet to report that the remote peer has shut down its socket, independently of whether there is any buffered data remaining to be read. This works only on systems where the kernel exposes that information, namely: * WAIT_USE_POLL builds using POLLRDHUP, if available * WAIT_USE_EPOLL builds using EPOLLRDHUP * WAIT_USE_KQUEUE builds using EV_EOF Reviewed-by: Zhihong Yu Reviewed-by: Maksim Milyutin Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru --- src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++---- src/include/storage/latch.h | 6 ++- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 5bb609b368d..c3aaa8bff03 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set) * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, * can be combined with other WL_SOCKET_* events (on non-Windows * platforms, this is the same as WL_SOCKET_WRITEABLE) + * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies * * Returns the offset in WaitEventSet->events (starting from 0), which can be @@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) else { Assert(event->fd != PGINVALID_SOCKET); - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_CLOSED) + epoll_ev.events |= EPOLLRDHUP; } /* @@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } else { - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) pollfd->events |= POLLIN; if (event->events & WL_SOCKET_WRITEABLE) pollfd->events |= POLLOUT; +#ifdef POLLRDHUP + if (event->events & WL_SOCKET_CLOSED) + pollfd->events |= POLLRDHUP; +#endif } Assert(event->fd != PGINVALID_SOCKET); @@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) * old event mask to the new event mask, since kevent treats readable * and writable as separate events. */ - if (old_events & WL_SOCKET_READABLE) + if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) old_filt_read = true; - if (event->events & WL_SOCKET_READABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) new_filt_read = true; if (old_events & WL_SOCKET_WRITEABLE) old_filt_write = true; @@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event); } - Assert(count > 0); + /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */ + if (count == 0) + return; + Assert(count <= 2); rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); @@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd != PGINVALID_SOCKET); @@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))) + { + /* remote peer shut down, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events++; returned_events++; } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd >= 0); @@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_READABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_kqueue_event->filter == EVFILT_READ) && + (cur_kqueue_event->flags & EV_EOF)) + { + /* the remote peer has shut down */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_kqueue_event->filter == EVFILT_WRITE)) { @@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { int errflags = POLLHUP | POLLERR | POLLNVAL; @@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } +#ifdef POLLRDHUP + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_pollfd->revents & (POLLRDHUP | errflags))) + { + /* remote peer closed, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } +#endif + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, } #endif +/* + * Return whether the current build options can report WL_SOCKET_CLOSED. + */ +bool +WaitEventSetCanReportClosed(void) +{ +#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \ + defined(WAIT_USE_EPOLL) || \ + defined(WAIT_USE_KQUEUE) + return true; +#else + return false; +#endif +} + /* * Get the number of wait events registered in a given WaitEventSet. */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 3aa7b338343..0dd79d73fa2 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -134,10 +134,11 @@ typedef struct Latch /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif - +#define WL_SOCKET_CLOSED (1 << 7) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ - WL_SOCKET_CONNECTED) + WL_SOCKET_CONNECTED | \ + WL_SOCKET_CLOSED) typedef struct WaitEvent { @@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); extern int GetNumRegisteredWaitEvents(WaitEventSet *set); +extern bool WaitEventSetCanReportClosed(void); #endif /* LATCH_H */