diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 40ec0e17dc5..e2a2ebb30f9 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param) stream.timeline = param->timeline; stream.sysidentifier = param->sysidentifier; stream.stream_stop = reached_end_position; +#ifndef WIN32 + stream.stop_socket = bgpipe[0]; +#else + stream.stop_socket = PGINVALID_SOCKET; +#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; stream.do_sync = do_sync; diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 1a9fe81be14..09385c5cbfc 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -409,6 +409,7 @@ StreamLog(void) stream.timeline); stream.stream_stop = stop_streaming; + stream.stop_socket = PGINVALID_SOCKET; stream.standby_message_timeout = standby_message_timeout; stream.synchronous = synchronous; stream.do_sync = true; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 8511e57cf7d..c41bba28cdf 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms); -static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); +static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); +static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, + char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, @@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn) * return. As long as it returns false, streaming will continue * indefinitely. * + * If stream_stop() checks for external input, stop_socket should be set to + * the FD it checks. This will allow such input to be detected promptly + * rather than after standby_message_timeout (which might be indefinite). + * Note that signals will interrupt waits for input as well, but that is + * race-y since a signal received while busy won't interrupt the wait. + * * standby_message_timeout controls how often we send a message * back to the master letting it know our progress, in milliseconds. + * Zero means no messages are sent. * This message will only contain the write location, and never * flush or replay. * @@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); - r = CopyStreamReceive(conn, sleeptime, ©buf); + r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); while (r != 0) { if (r == -1) @@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, ©buf); + r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); } } @@ -881,20 +889,25 @@ error: } /* - * Wait until we can read CopyData message, or timeout. + * Wait until we can read a CopyData message, + * or timeout, or occurrence of a signal or input on the stop_socket. + * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.) * * Returns 1 if data has become available for reading, 0 if timed out - * or interrupted by signal, and -1 on an error. + * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms) +CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) { int ret; fd_set input_mask; + int connsocket; + int maxfd; struct timeval timeout; struct timeval *timeoutptr; - if (PQsocket(conn) < 0) + connsocket = PQsocket(conn); + if (connsocket < 0) { fprintf(stderr, _("%s: invalid socket: %s"), progname, PQerrorMessage(conn)); @@ -902,7 +915,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) } FD_ZERO(&input_mask); - FD_SET(PQsocket(conn), &input_mask); + FD_SET(connsocket, &input_mask); + maxfd = connsocket; + if (stop_socket != PGINVALID_SOCKET) + { + FD_SET(stop_socket, &input_mask); + maxfd = Max(maxfd, stop_socket); + } if (timeout_ms < 0) timeoutptr = NULL; @@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) timeoutptr = &timeout; } - ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (ret == 0 || (ret < 0 && errno == EINTR)) - return 0; /* Got a timeout or signal */ - else if (ret < 0) + ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr); + + if (ret < 0) { + if (errno == EINTR) + return 0; /* Got a signal, so not an error */ fprintf(stderr, _("%s: select() failed: %s\n"), progname, strerror(errno)); return -1; } + if (ret > 0 && FD_ISSET(connsocket, &input_mask)) + return 1; /* Got input on connection socket */ - return 1; + return 0; /* Got timeout or input on stop_socket */ } /* @@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) * point to a buffer holding the received message. The buffer is only valid * until the next CopyStreamReceive call. * - * 0 if no data was available within timeout, or wait was interrupted - * by signal. -1 on error. -2 if the server ended the COPY. + * Returns 0 if no data was available within timeout, or if wait was + * interrupted by signal or stop_socket input. + * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, char **buffer) +CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, + char **buffer) { char *copybuf = NULL; int rawlen; @@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer) rawlen = PQgetCopyData(conn, ©buf, 1); if (rawlen == 0) { + int ret; + /* - * No data available. Wait for some to appear, but not longer than the - * specified timeout, so that we can ping the server. + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. Also stop + * waiting if input appears on stop_socket. */ - if (timeout != 0) - { - int ret; + ret = CopyStreamPoll(conn, timeout, stop_socket); + if (ret <= 0) + return ret; - ret = CopyStreamPoll(conn, timeout); - if (ret <= 0) - return ret; - } - - /* Else there is actually data on the socket */ + /* Now there is actually data on the socket */ if (PQconsumeInput(conn) == 0) { fprintf(stderr, diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 42e93ac7454..9a51d9a9c49 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -42,6 +42,9 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ + pgsocket stop_socket; /* if valid, watch for input on this socket + * and check stream_stop() when there is any */ + WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ char *replication_slot; /* Replication slot to use, or NULL */