diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fa1db748b5e..6a252fcf454 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1151,6 +1151,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { + TimestampTz now; + /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); @@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); - pq_sendint64(&tmpbuf, GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + pq_sendint64(&tmpbuf, now); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ + CHECK_FOR_INTERRUPTS(); + /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); - if (!pq_is_send_pending()) + /* Try taking fast path unless we get too close to walsender timeout. */ + if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2) && + !pq_is_send_pending()) + { return; + } + /* If we have pending write here, go to slow path */ for (;;) { int wakeEvents; long sleeptime; - TimestampTz now; + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + now = GetCurrentTimestamp(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); + + if (!pq_is_send_pending()) + break; + + sleeptime = WalSndComputeSleeptime(now); + + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | + WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; + + /* Sleep until something happens or we time out */ + WaitLatchOrSocket(MyLatch, wakeEvents, + MyProcPort->sock, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1198,34 +1231,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, SyncRepInitConfig(); } - /* Check for input from the client */ - ProcessRepliesIfAny(); - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); - - /* If we finished clearing the buffered data, we're done here. */ - if (!pq_is_send_pending()) - break; - - now = GetCurrentTimestamp(); - - /* die if timeout was reached */ - WalSndCheckTimeOut(now); - - /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); - - sleeptime = WalSndComputeSleeptime(now); - - wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; - - /* Sleep until something happens or we time out */ - WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); } /* reactivate latch so WalSndLoop knows to continue */