diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e4ecbe37ee9..9c320ff1edf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -764,15 +764,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); - /* more than one block available */ - if (targetPagePtr + XLOG_BLCKSZ <= flushptr) - count = XLOG_BLCKSZ; - /* not enough WAL synced, that can happen during shutdown */ - else if (targetPagePtr + reqLen > flushptr) + /* fail if not (implies we are going to shut down) */ + if (flushptr < targetPagePtr + reqLen) return -1; - /* part of the page available */ + + if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + count = XLOG_BLCKSZ; /* more than one block available */ else - count = flushptr - targetPagePtr; + count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); @@ -1158,7 +1157,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* - * Wait till WAL < loc is flushed to disk so it can be safely read. + * Wait till WAL < loc is flushed to disk so it can be safely sent to client. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but + * if we detect a shutdown request (either from postmaster or client) + * we will return early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) @@ -1225,9 +1228,7 @@ WalSndWaitForWal(XLogRecPtr loc) RecentFlushPtr = GetXLogReplayRecPtr(NULL); /* - * If postmaster asked us to stop, don't wait here anymore. This will - * cause the xlogreader to return without reading a full record, which - * is the fastest way to reach the mainloop which then can quit. + * If postmaster asked us to stop, don't wait anymore. * * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting @@ -1258,14 +1259,20 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndCaughtUp = true; /* - * Try to flush pending output to the client. Also wait for the socket - * becoming writable, if there's still pending output after an attempt - * to flush. Otherwise we might just sit on output data while waiting - * for new WAL being generated. + * Try to flush any pending output to the client. */ if (pq_flush_if_writable() != 0) WalSndShutdown(); + /* + * If we have received CopyDone from the client, sent CopyDone + * ourselves, and the output buffer is empty, it's time to exit + * streaming, so fail the current WAL fetch request. + */ + if (streamingDoneReceiving && streamingDoneSending && + !pq_is_send_pending()) + break; + now = GetCurrentTimestamp(); /* die if timeout was reached */ @@ -1274,6 +1281,13 @@ WalSndWaitForWal(XLogRecPtr loc) /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(now); + /* + * Sleep until something happens or we time out. Also wait for the + * socket becoming writable, if there's still pending output. + * Otherwise we might sit on sendable output data while waiting for + * new WAL to be generated. (But if we have nothing to send, we don't + * want to wake on socket-writable.) + */ sleeptime = WalSndComputeSleeptime(now); wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | @@ -1282,7 +1296,6 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - /* Sleep until something happens or we time out */ WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); } @@ -1870,7 +1883,8 @@ WalSndLoop(WalSndSendDataCallback send_data) * ourselves, and the output buffer is empty, it's time to exit * streaming. */ - if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving) + if (streamingDoneReceiving && streamingDoneSending && + !pq_is_send_pending()) break; /*