diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3497269850d..32cc52dbfd7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -476,7 +476,7 @@ ProcessRepliesIfAny(void) { unsigned char firstchar; int r; - int received = false; + bool received = false; for (;;) { @@ -700,6 +700,9 @@ WalSndLoop(void) /* Loop forever, unless we get an error */ for (;;) { + /* Clear any already-pending wakeups */ + ResetLatch(&MyWalSnd->latch); + /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. @@ -718,60 +721,81 @@ WalSndLoop(void) /* Normal exit from the walsender is here */ if (walsender_shutdown_requested) { - /* Inform the standby that XLOG streaming was done */ + /* Inform the standby that XLOG streaming is done */ pq_puttextmessage('C', "COPY 0"); pq_flush(); proc_exit(0); } + /* Check for input from the client */ + ProcessRepliesIfAny(); + /* * If we don't have any pending data in the output buffer, try to send - * some more. + * some more. If there is some, we don't bother to call XLogSend + * again until we've flushed it ... but we'd better assume we are not + * caught up. */ if (!pq_is_send_pending()) - { XLogSend(output_message, &caughtup); + else + caughtup = false; - /* - * Even if we wrote all the WAL that was available when we started - * sending, more might have arrived while we were sending this - * batch. We had the latch set while sending, so we have not - * received any signals from that time. Let's arm the latch again, - * and after that check that we're still up-to-date. - */ - if (caughtup && !pq_is_send_pending()) - { - ResetLatch(&MyWalSnd->latch); - - XLogSend(output_message, &caughtup); - } - } - - /* Flush pending output to the client */ + /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) break; - /* - * When SIGUSR2 arrives, we send any outstanding logs up to the - * shutdown checkpoint record (i.e., the latest record) and exit. - */ - if (walsender_ready_to_stop && !pq_is_send_pending()) + /* If nothing remains to be sent right now ... */ + if (caughtup && !pq_is_send_pending()) { - XLogSend(output_message, &caughtup); - ProcessRepliesIfAny(); - if (caughtup && !pq_is_send_pending()) - walsender_shutdown_requested = true; + /* + * If we're in catchup state, move to streaming. This is an + * important state change for users to know about, since before + * this point data loss might occur if the primary dies and we + * need to failover to the standby. The state change is also + * important for synchronous replication, since commits that + * started to wait at that point might wait for some time. + */ + if (MyWalSnd->state == WALSNDSTATE_CATCHUP) + { + ereport(DEBUG1, + (errmsg("standby \"%s\" has now caught up with primary", + application_name))); + WalSndSetState(WALSNDSTATE_STREAMING); + } + + /* + * When SIGUSR2 arrives, we send any outstanding logs up to the + * shutdown checkpoint record (i.e., the latest record) and exit. + * This may be a normal termination at shutdown, or a promotion, + * the walsender is not sure which. + */ + if (walsender_ready_to_stop) + { + /* ... let's just be real sure we're caught up ... */ + XLogSend(output_message, &caughtup); + if (caughtup && !pq_is_send_pending()) + { + walsender_shutdown_requested = true; + continue; /* don't want to wait more */ + } + } } - if ((caughtup || pq_is_send_pending()) && - !got_SIGHUP && - !walsender_shutdown_requested) + /* + * We don't block if not caught up, unless there is unsent data + * pending in which case we'd better block until the socket is + * write-ready. This test is only needed for the case where XLogSend + * loaded a subset of the available data but then pq_flush_if_writable + * flushed it all --- we should immediately try to send more. + */ + if (caughtup || pq_is_send_pending()) { TimestampTz finish_time = 0; - long sleeptime; + long sleeptime = -1; - /* Reschedule replication timeout */ + /* Determine time until replication timeout */ if (replication_timeout > 0) { long secs; @@ -795,12 +819,16 @@ WalSndLoop(void) sleeptime = WalSndDelay; } - /* Sleep */ + /* Sleep until something happens or replication timeout */ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, true, pq_is_send_pending(), sleeptime); - /* Check for replication timeout */ + /* + * Check for replication timeout. Note we ignore the corner case + * possibility that the client replied just as we reached the + * timeout ... he's supposed to reply *before* that. + */ if (replication_timeout > 0 && GetCurrentTimestamp() >= finish_time) { @@ -814,24 +842,6 @@ WalSndLoop(void) break; } } - - /* - * If we're in catchup state, see if its time to move to streaming. - * This is an important state change for users, since before this - * point data loss might occur if the primary dies and we need to - * failover to the standby. The state change is also important for - * synchronous replication, since commits that started to wait at that - * point might wait for some time. - */ - if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup) - { - ereport(DEBUG1, - (errmsg("standby \"%s\" has now caught up with primary", - application_name))); - WalSndSetState(WALSNDSTATE_STREAMING); - } - - ProcessRepliesIfAny(); } /*