diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index de03362c91c..37b481c0020 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -168,13 +168,18 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, status = PGRES_POLLING_WRITING; do { - /* Wait for socket ready and/or other events. */ int io_flag; int rc; - io_flag = (status == PGRES_POLLING_READING - ? WL_SOCKET_READABLE - : WL_SOCKET_WRITEABLE); + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + /* Windows needs a different test while waiting for connection-made */ + else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 07b1364de8f..4eb6e83682e 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -344,9 +344,9 @@ WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, * Like WaitLatch, but with an extra socket argument for WL_SOCKET_* * conditions. * - * When waiting on a socket, EOF and error conditions are reported by - * returning the socket as readable/writable or both, depending on - * WL_SOCKET_READABLE/WL_SOCKET_WRITEABLE being specified. + * When waiting on a socket, EOF and error conditions always cause the socket + * to be reported as readable/writable/connected, so that the caller can deal + * with the condition. * * NB: These days this is just a wrapper around the WaitEventSet API. When * using a latch very frequently, consider creating a longer living @@ -374,11 +374,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); - if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + if (wakeEvents & WL_SOCKET_MASK) { int ev; - ev = wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + ev = wakeEvents & WL_SOCKET_MASK; AddWaitEventToSet(set, ev, sock, NULL, NULL); } @@ -390,8 +390,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, { ret |= event.events & (WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_READABLE | - WL_SOCKET_WRITEABLE); + WL_SOCKET_MASK); } FreeWaitEventSet(set); @@ -640,10 +639,13 @@ FreeWaitEventSet(WaitEventSet *set) * Add an event to the set. Possible events are: * - WL_LATCH_SET: Wait for the latch to be set * - WL_POSTMASTER_DEATH: Wait for postmaster to die - * - WL_SOCKET_READABLE: Wait for socket to become readable - * can be combined in one event with WL_SOCKET_WRITEABLE - * - WL_SOCKET_WRITEABLE: Wait for socket to become writeable - * can be combined with WL_SOCKET_READABLE + * - WL_SOCKET_READABLE: Wait for socket to become readable, + * can be combined in one event with other WL_SOCKET_* events + * - WL_SOCKET_WRITEABLE: Wait for socket to become writeable, + * can be combined with other WL_SOCKET_* events + * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, + * can be combined with other WL_SOCKET_* events (on non-Windows + * platforms, this is the same as WL_SOCKET_WRITEABLE) * * Returns the offset in WaitEventSet->events (starting from 0), which can be * used to modify previously added wait events using ModifyWaitEvent(). @@ -652,9 +654,9 @@ FreeWaitEventSet(WaitEventSet *set) * i.e. it must be a process-local latch initialized with InitLatch, or a * shared latch associated with the current process by calling OwnLatch. * - * In the WL_SOCKET_READABLE/WRITEABLE case, EOF and error conditions are - * reported by returning the socket as readable/writable or both, depending on - * WL_SOCKET_READABLE/WRITEABLE being specified. + * In the WL_SOCKET_READABLE/WRITEABLE/CONNECTED cases, EOF and error + * conditions cause the socket to be reported as readable/writable/connected, + * so that the caller can deal with the condition. * * The user_data pointer specified here will be set for the events returned * by WaitEventSetWait(), allowing to easily associate additional data with @@ -685,8 +687,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, } /* waiting for socket readiness without a socket indicates a bug */ - if (fd == PGINVALID_SOCKET && - (events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))) + if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); event = &set->events[set->nevents]; @@ -885,6 +886,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) flags |= FD_READ; if (event->events & WL_SOCKET_WRITEABLE) flags |= FD_WRITE; + if (event->events & WL_SOCKET_CONNECTED) + flags |= FD_CONNECT; if (*handle == WSA_INVALID_EVENT) { @@ -1395,7 +1398,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & WL_SOCKET_MASK) { WSANETWORKEVENTS resEvents; HANDLE handle = set->handles[cur_event->pos + 1]; @@ -1432,13 +1435,16 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, /* writeable */ occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CONNECTED) && + (resEvents.lNetworkEvents & FD_CONNECT)) + { + /* connected */ + occurred_events->events |= WL_SOCKET_CONNECTED; + } if (resEvents.lNetworkEvents & FD_CLOSE) { - /* EOF */ - if (cur_event->events & WL_SOCKET_READABLE) - occurred_events->events |= WL_SOCKET_READABLE; - if (cur_event->events & WL_SOCKET_WRITEABLE) - occurred_events->events |= WL_SOCKET_WRITEABLE; + /* EOF/error, so signal all caller-requested socket flags */ + occurred_events->events |= (cur_event->events & WL_SOCKET_MASK); } if (occurred_events->events != 0) diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 73abfafec56..a43193c916b 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -126,6 +126,16 @@ typedef struct Latch #define WL_SOCKET_WRITEABLE (1 << 2) #define WL_TIMEOUT (1 << 3) /* not for WaitEventSetWait() */ #define WL_POSTMASTER_DEATH (1 << 4) +#ifdef WIN32 +#define WL_SOCKET_CONNECTED (1 << 5) +#else +/* avoid having to to deal with case on platforms not requiring it */ +#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE +#endif + +#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ + WL_SOCKET_WRITEABLE | \ + WL_SOCKET_CONNECTED) typedef struct WaitEvent {